diff --git a/examples/blend_and_shuffle.py b/examples/blend_and_shuffle.py index c78a3f03..bd7fbd9d 100644 --- a/examples/blend_and_shuffle.py +++ b/examples/blend_and_shuffle.py @@ -17,7 +17,7 @@ import nemo_curator as nc from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -28,7 +28,7 @@ def main(args): output_path = "/path/to/output" # Set up Dask client - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Blend the datasets datasets = [DocumentDataset.read_json(path) for path in dataset_paths] @@ -46,7 +46,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/classifier_filtering.py b/examples/classifier_filtering.py index c8c52bdf..a6476e39 100644 --- a/examples/classifier_filtering.py +++ b/examples/classifier_filtering.py @@ -23,7 +23,7 @@ from nemo_curator.modifiers import FastTextLabelModifier from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def load_dataset(input_data_dir): @@ -55,7 +55,7 @@ def main(args): filtered_output = "/path/to/output" # Prepare samples for the classifier - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) low_quality_samples = create_samples( low_quality_data_path, "__label__lq", num_low_quality_samples ) @@ -100,7 +100,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/domain_classifier_example.py b/examples/domain_classifier_example.py index 8a023141..d4709822 100644 --- a/examples/domain_classifier_example.py +++ b/examples/domain_classifier_example.py @@ -19,7 +19,7 @@ from nemo_curator import DomainClassifier from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -60,7 +60,7 @@ def main(args): input_file_path = "/path/to/data" output_file_path = "./" - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) input_dataset = DocumentDataset.read_json( input_file_path, backend="cudf", add_filename=True @@ -89,54 +89,18 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - parser.add_argument( - "--scheduler-address", - type=str, - default=None, - help="Address to the scheduler of a created dask cluster. If not provided" - "a single node LocalCUDACluster will be started.", - ) - parser.add_argument( - "--scheduler-file", - type=str, - default=None, - help="Path to the scheduler file of a created dask cluster. If not provided" - " a single node LocalCUDACluster will be started.", - ) - parser.add_argument( - "--nvlink-only", - action="store_true", - help="Start a local cluster with only NVLink enabled." - "Only applicable when protocol=ucx and no scheduler file/address is specified", - ) - parser.add_argument( - "--protocol", - type=str, - default="ucx", - help="Protcol to use for dask cluster" - "Note: This only applies to the localCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-protocol", # noqa: E501 - ) - parser.add_argument( - "--rmm-pool-size", - type=str, - default="14GB", - help="Initial pool size to use for the RMM Pool Memory allocator" - "Note: This only applies to the localCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-rmm-pool-size", # noqa: E501 - ) - parser.add_argument("--enable-spilling", action="store_true") - parser.add_argument("--set-torch-to-use-rmm", action="store_true") - parser.add_argument( - "--device", - type=str, - default="gpu", - help="Device to run the script on. Either 'cpu' or 'gpu'.", - ) - - return parser + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_device() + argumentHelper.add_arg_enable_spilling() + argumentHelper.add_arg_nvlink_only() + argumentHelper.add_arg_protocol() + argumentHelper.add_arg_rmm_pool_size() + argumentHelper.add_arg_scheduler_address() + argumentHelper.add_arg_scheduler_file() + argumentHelper.add_arg_set_torch_to_use_rmm() + + return argumentHelper.parser if __name__ == "__main__": diff --git a/examples/download_arxiv.py b/examples/download_arxiv.py index 084e7d10..81d68be8 100644 --- a/examples/download_arxiv.py +++ b/examples/download_arxiv.py @@ -16,7 +16,7 @@ from nemo_curator.download import download_arxiv from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -27,7 +27,7 @@ def main(args): url_limit = 10 # Set up Dask client - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Download and sample data arxiv = download_arxiv(output_directory, url_limit=url_limit) @@ -42,7 +42,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/download_common_crawl.py b/examples/download_common_crawl.py index 440d2069..7466a6e5 100644 --- a/examples/download_common_crawl.py +++ b/examples/download_common_crawl.py @@ -16,7 +16,7 @@ from nemo_curator.download import download_common_crawl from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -29,7 +29,7 @@ def main(args): url_limit = 10 # Set up Dask client - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Download and sample data common_crawl = download_common_crawl( @@ -46,7 +46,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/download_wikipedia.py b/examples/download_wikipedia.py index c3b5ca6a..93c0c9b2 100644 --- a/examples/download_wikipedia.py +++ b/examples/download_wikipedia.py @@ -16,7 +16,7 @@ from nemo_curator.download import download_wikipedia from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -28,7 +28,7 @@ def main(args): url_limit = 10 # Set up Dask client - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Download and sample data wikipedia = download_wikipedia( @@ -45,7 +45,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/exact_deduplication.py b/examples/exact_deduplication.py index 215883ec..81a2d66c 100644 --- a/examples/exact_deduplication.py +++ b/examples/exact_deduplication.py @@ -19,7 +19,7 @@ from nemo_curator.modules import ExactDuplicates from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def pre_imports(): @@ -33,7 +33,7 @@ def main(args): output_dir = "./" dataset_id_field = "id" dataset_text_field = "text" - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) backend = "cudf" if args.device == "gpu" else "pandas" if args.device == "gpu": @@ -79,7 +79,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/find_pii_and_deidentify.py b/examples/find_pii_and_deidentify.py index 4e5c1dc1..fe51ee8c 100644 --- a/examples/find_pii_and_deidentify.py +++ b/examples/find_pii_and_deidentify.py @@ -21,13 +21,13 @@ from nemo_curator.modifiers.pii_modifier import PiiModifier from nemo_curator.modules.modify import Modify from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def console_script(): parser = argparse.ArgumentParser() - arguments = add_distributed_args(parser).parse_args() - _ = get_client(**parse_client_args(arguments)) + args = ArgumentHelper(parser).add_distributed_args().parse_args() + _ = get_client(**ArgumentHelper.parse_client_args(args)) dataframe = pd.DataFrame( {"text": ["Sarah and Ryan went out to play", "Jensen is the CEO of NVIDIA"]} diff --git a/examples/fuzzy_deduplication.py b/examples/fuzzy_deduplication.py index 1fe8cafd..93dc869d 100644 --- a/examples/fuzzy_deduplication.py +++ b/examples/fuzzy_deduplication.py @@ -21,7 +21,7 @@ from nemo_curator import FuzzyDuplicates, FuzzyDuplicatesConfig from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, write_to_disk -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def pre_imports(): @@ -44,7 +44,7 @@ def main(args): assert args.device == "gpu" with dask.config.set({"dataframe.backend": backend}): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) client.run(pre_imports) t0 = time.time() @@ -102,7 +102,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/identify_languages_and_fix_unicode.py b/examples/identify_languages_and_fix_unicode.py index dc011ed5..92f628e3 100644 --- a/examples/identify_languages_and_fix_unicode.py +++ b/examples/identify_languages_and_fix_unicode.py @@ -24,7 +24,7 @@ get_all_files_paths_under, separate_by_metadata, ) -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def load_dataset(input_data_dir): @@ -49,7 +49,7 @@ def main(args): language_field = "language" # Prepare samples for the classifier - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Filter data multilingual_dataset = load_dataset(multilingual_data_path) @@ -88,7 +88,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/quality_classifier_example.py b/examples/quality_classifier_example.py index cd870db3..277200c0 100644 --- a/examples/quality_classifier_example.py +++ b/examples/quality_classifier_example.py @@ -19,7 +19,7 @@ from nemo_curator import QualityClassifier from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -32,7 +32,7 @@ def main(args): input_file_path = "/path/to/data" output_file_path = "./" - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) input_dataset = DocumentDataset.read_json( input_file_path, backend="cudf", add_filename=True @@ -60,54 +60,18 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - parser.add_argument( - "--scheduler-address", - type=str, - default=None, - help="Address to the scheduler of a created dask cluster. If not provided" - "a single node LocalCUDACluster will be started.", - ) - parser.add_argument( - "--scheduler-file", - type=str, - default=None, - help="Path to the scheduler file of a created dask cluster. If not provided" - " a single node LocalCUDACluster will be started.", - ) - parser.add_argument( - "--nvlink-only", - action="store_true", - help="Start a local cluster with only NVLink enabled." - "Only applicable when protocol=ucx and no scheduler file/address is specified", - ) - parser.add_argument( - "--protocol", - type=str, - default="ucx", - help="Protcol to use for dask cluster" - "Note: This only applies to the localCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-protocol", # noqa: E501 - ) - parser.add_argument( - "--rmm-pool-size", - type=str, - default="14GB", - help="Initial pool size to use for the RMM Pool Memory allocator" - "Note: This only applies to the localCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-rmm-pool-size", # noqa: E501 - ) - parser.add_argument("--enable-spilling", action="store_true") - parser.add_argument("--set-torch-to-use-rmm", action="store_true") - parser.add_argument( - "--device", - type=str, - default="gpu", - help="Device to run the script on. Either 'cpu' or 'gpu'.", - ) - - return parser + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_device() + argumentHelper.add_arg_enable_spilling() + argumentHelper.add_arg_nvlink_only() + argumentHelper.add_arg_protocol() + argumentHelper.add_arg_rmm_pool_size() + argumentHelper.add_arg_scheduler_address() + argumentHelper.add_arg_scheduler_file() + argumentHelper.add_arg_set_torch_to_use_rmm() + + return argumentHelper.parser if __name__ == "__main__": diff --git a/examples/raw_download_common_crawl.py b/examples/raw_download_common_crawl.py index 7595e1cb..522ad5f9 100644 --- a/examples/raw_download_common_crawl.py +++ b/examples/raw_download_common_crawl.py @@ -18,7 +18,7 @@ from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.download_utils import get_common_crawl_urls from nemo_curator.utils.file_utils import expand_outdir_and_mkdir -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -31,7 +31,7 @@ def main(args): url_limit = 10 # Set up Dask client - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Download the raw compressed WARC files # Unlike the download_common_crawl function, this does not extract the files @@ -51,7 +51,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/examples/task_decontamination.py b/examples/task_decontamination.py index 1916c6b8..daf707c3 100644 --- a/examples/task_decontamination.py +++ b/examples/task_decontamination.py @@ -40,7 +40,7 @@ ) from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def load_dataset(input_data_dir): @@ -80,7 +80,7 @@ def main(args): ] # Prepare samples for the classifier - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Filter data target_dataset = load_dataset(contaminated_dataset_path) @@ -98,7 +98,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): - return add_distributed_args(parser) + return ArgumentHelper(parser).add_distributed_args() if __name__ == "__main__": diff --git a/nemo_curator/sample_dataframe.py b/nemo_curator/sample_dataframe.py index 8c7909dd..15d5e83f 100644 --- a/nemo_curator/sample_dataframe.py +++ b/nemo_curator/sample_dataframe.py @@ -21,7 +21,7 @@ ) from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def sample_dataframe(df, num_samples): @@ -56,9 +56,10 @@ def sample_dataframe(df, num_samples): help="The number of rows to sample", required=True, ) + args = parser.parse_args() print(f"Arguments parsed = {args}", flush=True) - client = get_client(**parse_client_args(args), cluster_type="gpu") + client = get_client(**ArgumentHelper.parse_client_args(args), cluster_type="gpu") print("Starting sampling workflow", flush=True) st = time.time() diff --git a/nemo_curator/scripts/add_id.py b/nemo_curator/scripts/add_id.py index c7fed67e..e8419297 100644 --- a/nemo_curator/scripts/add_id.py +++ b/nemo_curator/scripts/add_id.py @@ -22,15 +22,11 @@ expand_outdir_and_mkdir, get_all_files_paths_under, ) -from nemo_curator.utils.script_utils import ( - add_distributed_args, - attach_bool_arg, - parse_client_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) output_dir = expand_outdir_and_mkdir(args.output_data_dir) files = get_all_files_paths_under(args.input_data_dir) @@ -73,29 +69,21 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_output_data_dir( + help="The output directory to where the jsonl files with ids will " + "be written." ) - parser.add_argument( - "--starting-index", - type=int, - default=None, - help="If supplied, determines the starting index from which to start " - "indexing the documents. By default, it is unspecified, and uses an id" - " scheme that is fast to calculate and is not guaranteed to be ordered.", - ) - parser.add_argument( - "--output-data-dir", - type=str, - default=None, - help="The output directory to where the jsonl " - "files with ids will be written. If not specified, the ids will " - "be written in-place", + argumentHelper.add_arg_output_file_type() + argumentHelper.add_arg_seed() + argumentHelper.add_arg_shuffle( + help="Shuffle the order of files before assigning IDs." + "Useful for creating a copy dataset with different IDs" ) + argumentHelper.add_distributed_args() parser.add_argument( "--id-field-name", type=str, @@ -113,36 +101,15 @@ def attach_args( "document belongs to a particular dataset (e.g., wiki for documents" "that come from the wikipedia dataset)", ) - attach_bool_arg( - parser, - "shuffle", - help_str="Shuffle the order of files before assigning IDs." - "Useful for creating a copy dataset with different IDs", - ) parser.add_argument( - "--seed", + "--starting-index", type=int, - default=42, - help="If shuffling is specified, use this random seed to " - "perform the random shuffling", - ) - parser.add_argument( - "--input-file-type", - type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - parser.add_argument( - "--output-file-type", - type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="If supplied, determines the starting index from which to start " + "indexing the documents. By default, it is unspecified, and uses an id" + " scheme that is fast to calculate and is not guaranteed to be ordered.", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/blend_datasets.py b/nemo_curator/scripts/blend_datasets.py index 67a833b6..b13ef49d 100644 --- a/nemo_curator/scripts/blend_datasets.py +++ b/nemo_curator/scripts/blend_datasets.py @@ -21,15 +21,11 @@ expand_outdir_and_mkdir, get_all_files_paths_under, ) -from nemo_curator.utils.script_utils import ( - add_distributed_args, - attach_bool_arg, - parse_client_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) out_dir = expand_outdir_and_mkdir(args.output_data_dir) @@ -75,6 +71,16 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_output_data_dir( + help="The output directory to where the blended dataset will be written." + ) + argumentHelper.add_arg_output_file_type() + argumentHelper.add_arg_seed() + argumentHelper.add_arg_shuffle(help="Shuffles the dataset after blending") + argumentHelper.add_distributed_args() parser.add_argument( "--input-data-dirs", type=str, @@ -82,22 +88,6 @@ def attach_args( help="Comma-separated list of directories consisting of dataset " "files that are accessible to all nodes.", ) - parser.add_argument( - "--weights", - type=str, - default=None, - help="Comma-separated list of floating-point weights corresponding " - "to each dataset passed in --input-data-dirs", - ) - parser.add_argument( - "--output-data-dir", - type=str, - default=None, - help="The output directory to where the blended dataset is" - "retained during filtering will be written. If this argument " - "is not specified, then the document scores from the " - "filter(s) will be written to the document meta data in place", - ) parser.add_argument( "--target-samples", type=int, @@ -106,35 +96,14 @@ def attach_args( " There may be more samples in order to accurately reflect the " "weight balance, but there will never be less", ) - attach_bool_arg( - parser, - "shuffle", - default=False, - help_str="Shuffles the dataset after blending", - ) parser.add_argument( - "--seed", - type=int, - default=None, - help="If specified, the random seed used for shuffling.", - ) - parser.add_argument( - "--input-file-type", - type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - parser.add_argument( - "--output-file-type", + "--weights", type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="Comma-separated list of floating-point weights corresponding " + "to each dataset passed in --input-data-dirs", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/domain_classifier_inference.py b/nemo_curator/scripts/domain_classifier_inference.py index 128e3e7d..b3a31f1d 100644 --- a/nemo_curator/scripts/domain_classifier_inference.py +++ b/nemo_curator/scripts/domain_classifier_inference.py @@ -23,10 +23,7 @@ # Get relevant args from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_remaining_files -from nemo_curator.utils.script_utils import ( - parse_client_args, - parse_distributed_classifier_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper warnings.filterwarnings("ignore") @@ -61,11 +58,11 @@ def main(): "Travel_and_Transportation", ] - args = parse_distributed_classifier_args().parse_args() + args = ArgumentHelper.parse_distributed_classifier_args().parse_args() print(f"Arguments parsed = {args}", flush=True) max_chars = 2000 - client_args = parse_client_args(args) + client_args = ArgumentHelper.parse_client_args(args) client_args["cluster_type"] = "gpu" client = get_client(**client_args) print("Starting domain classifier inference", flush=True) diff --git a/nemo_curator/scripts/download_and_extract.py b/nemo_curator/scripts/download_and_extract.py index d87ef9ad..8eaae4c0 100644 --- a/nemo_curator/scripts/download_and_extract.py +++ b/nemo_curator/scripts/download_and_extract.py @@ -22,11 +22,7 @@ expand_outdir_and_mkdir, get_all_files_paths_under, ) -from nemo_curator.utils.script_utils import ( - add_distributed_args, - attach_bool_arg, - parse_client_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper def read_urls(file_path): @@ -36,7 +32,7 @@ def read_urls(file_path): def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) if args.input_url_file: urls = read_urls(args.input_url_file) @@ -107,72 +103,59 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir(help="Path to input data directory.") + argumentHelper.add_arg_input_meta() + argumentHelper.add_distributed_args() parser.add_argument( - "--input-url-file", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", - ) - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - required=False, - help="Path to input data directory", - ) - parser.add_argument( - "--input-meta", - type=str, - default=None, - help="A string formatted as a dictionary, which outlines the field names and " - "their respective data types within the JSONL input files.", - ) - parser.add_argument( - "--output-json-dir", + "--builder-config-file", type=str, - default=None, - help="Output directory to store the extracted text in jsonl files", + required=True, + help="YAML file that contains paths to implementations of a downloader, " + "iterator and extractor that will be used in this program " + "to build the documents that make up the output dataset", ) - attach_bool_arg( + ArgumentHelper.attach_bool_arg( parser, "download-only", - help_str="Specify this flag if you desire to only download the data" + help="Specify this flag if you desire to only download the data" "files and not extract text from the downloaded files", ) parser.add_argument( - "--builder-config-file", + "--input-url-file", type=str, default=None, - required=True, - help="YAML file that contains paths to implementations of a downloader, " - "iterator and extractor that will be used in this program " - "to build the documents that make up the output dataset", + help="Input directory consisting of .jsonl files that are accessible " + "to all nodes. Use this for a distributed file system", ) - attach_bool_arg( + ArgumentHelper.attach_bool_arg( parser, "keep-downloaded-files", - help_str="If this flag is set to true, the downloaded data files " + help="If this flag is set to true, the downloaded data files " "will be kept on disk and not removed after extraction", ) parser.add_argument( "--output-download-dir", type=str, default=None, - required=False, help="The directory to where data files will be written " "in 'download-only' mode. Specify this argument only when " "the '--download-only flag is specified'.", ) - attach_bool_arg( + parser.add_argument( + "--output-json-dir", + type=str, + default=None, + help="Output directory to store the extracted text in jsonl files", + ) + ArgumentHelper.attach_bool_arg( parser, "overwrite-existing-json", - help_str="If this flag is specified, then the json data will be " + help="If this flag is specified, then the json data will be " "overwritten if downloading from the the same file.", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/filter_documents.py b/nemo_curator/scripts/filter_documents.py index 8ab33e8a..023d6d92 100644 --- a/nemo_curator/scripts/filter_documents.py +++ b/nemo_curator/scripts/filter_documents.py @@ -20,11 +20,7 @@ from nemo_curator.utils.config_utils import build_filter_pipeline from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_batched_files -from nemo_curator.utils.script_utils import ( - add_distributed_args, - attach_bool_arg, - parse_client_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper def get_dataframe_complement(original_df, filtered_df): @@ -65,7 +61,7 @@ def write_scores(df, output_dir): def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) if args.device == "cpu": backend = "pandas" elif args.device == "gpu": @@ -181,20 +177,15 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of dataset files that are accessible " - "to all nodes. Use this for a distributed file system", - ) - parser.add_argument( - "--input-local-data-dir", - type=str, - default=None, - help="Input directory consisting of dataset files. " - "Use this argument when a distributed file system is not available.", - ) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_batch_size() + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_input_local_data_dir() + argumentHelper.add_arg_log_dir(default="./log/filter_docs") + argumentHelper.add_arg_output_file_type() + argumentHelper.add_distributed_args() parser.add_argument( "--filter-config-file", type=str, @@ -202,56 +193,17 @@ def attach_args( help="The input filter configuration file that contains the " "path to the filter module as well as the filter parameters", ) - parser.add_argument( - "--output-retained-document-dir", - type=str, - default=None, - help="The output directory to where documents that are " - "retained during filtering will be written. If this argument " - "is not specified, then the document scores from the " - "filter(s) will be written to the document meta data in place", - ) - parser.add_argument( - "--output-removed-document-dir", - type=str, - default=None, - help="The output directory to where documents that are removed during " - "filtering will be written. This argument is mainly for quality control " - "in order examine documents that are not preserved during filtering. " - "If it is not specified and the retained-document-dir is specified, " - "then only the retained documents will be written to disk", - ) - attach_bool_arg( + ArgumentHelper.attach_bool_arg( parser, "filter-only", default=False, - help_str="Specifying this flag will indicate to the code that only the " + help="Specifying this flag will indicate to the code that only the " "filtering operation should be performed and that scores should not be " "computed. This flag should be specified if scores have been " "pre-computed on the documents (e.g., the code was run without the " "'--output-retained-document-dir' argument) and users desire to apply " "the filter using the pre-computed scores", ) - attach_bool_arg( - parser, - "log-scores", - default=False, - help_str="Specifying this flag will cause the computed scores to be " - "logged as additional keys for each document. This only applies to " - "filters with 'log_score: True' in the config. This can aid in " - "performing an interactive quality check of the documents.", - ) - parser.add_argument( - "--output-document-score-dir", - type=str, - default=None, - help="The output directory to where the computed document scores will " - "be written. For each filter, its score will be written to a separate " - "file where each line of the file corresponds to the score computed " - "for each document in the corpus within this directory. This only applies to " - "filters with 'log_score: True' in the config. If this directory is not " - "specified, then filter scores will not be written", - ) parser.add_argument( "--id-field", type=str, @@ -262,11 +214,11 @@ def attach_args( "ids will be written to the output score directory such that each line" "is consistent with the lines of the written score files ", ) - attach_bool_arg( + ArgumentHelper.attach_bool_arg( parser, "keep-node-scores-tmp-dir", default=False, - help_str="If multiple nodes are used when computing scores, " + help="If multiple nodes are used when computing scores, " "each node will write out its scores to a temporary directory " "shared across all nodes. Then, the rank 0 node will " "concatenate all of the scores creating the output file. " @@ -282,36 +234,46 @@ def attach_args( "computing scores. By default a log message will " "be written every 10000 documents in a file", ) - parser.add_argument( - "--log-dir", - type=str, - default="./log/filter_docs", - help="The output log directory where node and local" - " ranks will write their respective log files", + ArgumentHelper.attach_bool_arg( + parser, + "log-scores", + default=False, + help="Specifying this flag will cause the computed scores to be " + "logged as additional keys for each document. This only applies to " + "filters with 'log_score: True' in the config. This can aid in " + "performing an interactive quality check of the documents.", ) parser.add_argument( - "--input-file-type", + "--output-document-score-dir", type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="The output directory to where the computed document scores will " + "be written. For each filter, its score will be written to a separate " + "file where each line of the file corresponds to the score computed " + "for each document in the corpus within this directory. This only applies to " + "filters with 'log_score: True' in the config. If this directory is not " + "specified, then filter scores will not be written", ) parser.add_argument( - "--output-file-type", + "--output-removed-document-dir", type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="The output directory to where documents that are removed during " + "filtering will be written. This argument is mainly for quality control " + "in order examine documents that are not preserved during filtering. " + "If it is not specified and the retained-document-dir is specified, " + "then only the retained documents will be written to disk", ) parser.add_argument( - "--batch-size", - type=int, - default=64, - help="Number of files to read into memory at a time.", + "--output-retained-document-dir", + type=str, + default=None, + help="The output directory to where documents that are " + "retained during filtering will be written. If this argument " + "is not specified, then the document scores from the " + "filter(s) will be written to the document meta data in place", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/find_exact_duplicates.py b/nemo_curator/scripts/find_exact_duplicates.py index 00018177..7f241f18 100644 --- a/nemo_curator/scripts/find_exact_duplicates.py +++ b/nemo_curator/scripts/find_exact_duplicates.py @@ -23,7 +23,7 @@ from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import get_all_files_paths_under from nemo_curator.utils.fuzzy_dedup_utils.io_utils import strip_trailing_sep -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def pre_imports(): @@ -38,7 +38,7 @@ def main(args): assert args.hash_method == "md5", "Currently only md5 hash is supported" args.set_torch_to_use_rmm = False - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) logger.info(f"Client Created {client}") if args.device == "gpu": client.run(pre_imports) @@ -85,24 +85,24 @@ def main(args): def attach_args(parser=None): - description = """Compute Exact duplicates in a given dataset. - """ if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Compute Exact duplicates in a given dataset.""" + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) + + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_output_dir( + help="Output directory where duplicate docs will be written. " + "Each file is a pickle file that contains a dictionary of numpy arrays. " + "The keys are the document ids and the values are the duplicate docs", + required=True, + ) parser.add_argument( "--hash-method", type=str, default="md5", help="Hash Method to use for exact dedup", ) - parser.add_argument( - "--output-dir", - type=str, - required=True, - help="Output directory where duplicate docs will be written. " - "Each file is a pickle file that contains a dictionary of numpy arrays. " - "The keys are the document ids and the values are the duplicate docs", - ) return parser diff --git a/nemo_curator/scripts/find_matching_ngrams.py b/nemo_curator/scripts/find_matching_ngrams.py index ab2bc9ee..6c326e61 100644 --- a/nemo_curator/scripts/find_matching_ngrams.py +++ b/nemo_curator/scripts/find_matching_ngrams.py @@ -19,11 +19,11 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Each rank read in the task data with open(args.input_task_ngrams, "rb") as fp: @@ -60,29 +60,12 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", - ) - parser.add_argument( - "--input-text-field", - type=str, - default="text", - help="The name of the field within each datapoint object of the input " - "file that contains the text.", - ) - parser.add_argument( - "--output-matched-ngram-data", - type=str, - default=None, - help="Output dictionary that contains the output matched n-grams " - "and the frequency of their matches, min-ngram size, max-ngram " - "size and the frequencies of n-gram sizes. All of these data will be " - "used by remove_matching_grams for which this program is a prequisite", - ) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_input_text_field() + argumentHelper.add_distributed_args() parser.add_argument( "--input-task-ngrams", type=str, @@ -102,15 +85,15 @@ def attach_args( help="The minimum n-gram size to consider within the datset", ) parser.add_argument( - "--input-file-type", + "--output-matched-ngram-data", type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="Output dictionary that contains the output matched n-grams " + "and the frequency of their matches, min-ngram size, max-ngram " + "size and the frequencies of n-gram sizes. All of these data will be " + "used by remove_matching_grams for which this program is a prequisite", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/find_pii_and_deidentify.py b/nemo_curator/scripts/find_pii_and_deidentify.py index e049abc2..619745b0 100644 --- a/nemo_curator/scripts/find_pii_and_deidentify.py +++ b/nemo_curator/scripts/find_pii_and_deidentify.py @@ -20,11 +20,9 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.modifiers.pii_modifier import PiiModifier from nemo_curator.modules.modify import Modify - -# from nemo_curator.pii.algorithm import DEFAULT_LANGUAGE from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_batched_files -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -92,89 +90,54 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + argumentHelper = ArgumentHelper(parser) - parser.add_argument( - "--language", - type=str, - default="en", - required=False, - help="Language of input documents", + argumentHelper.add_arg_batch_size( + default=2000, help="The batch size for processing multiple texts together." ) - - parser.add_argument( - "--supported-entities", - type=str, - default=None, - required=False, - help="Comma separated list of PII entity types. None implies all supported types", + argumentHelper.add_arg_input_data_dir(help="Directory containing the input files.") + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_language(help="Language of input documents") + argumentHelper.add_arg_output_data_dir( + help="The output directory to where redacted documents will be written." ) - + argumentHelper.add_arg_output_file_type() parser.add_argument( "--anonymize-action", type=str, default="replace", - required=False, help="Anonymization action. Choose from among: redact, hash, mask and replace", ) - - parser.add_argument( - "--hash-type", - type=str, - default=None, - required=False, - help="The hash type. Choose from among: sha256, sha512 or md5", - ) - parser.add_argument( "--chars-to-mask", type=int, default=100, - required=False, help="The number of characters to mask. Only applicable if anonymize action is mask", ) - + parser.add_argument( + "--hash-type", + type=str, + default=None, + help="The hash type. Choose from among: sha256, sha512 or md5", + ) parser.add_argument( "--masking-char", type=str, default="*", - required=False, help="The masking character. Only applicable if anonymize action is mask", ) - parser.add_argument( "--new-value", type=str, default=None, - required=False, help="The new value to replace with. Only applicable if anonymize action is replace", ) - parser.add_argument( - "--input-data-dir", + "--supported-entities", type=str, default=None, - required=True, - help="Directory containing the input files", - ) - - parser.add_argument( - "--input-file-type", - type=str, - default="jsonl", - required=True, - choices=["jsonl", "csv", "text"], - help="The input file type (only jsonl is currently supported)", - ) - - parser.add_argument( - "--output-file-type", - type=str, - default="jsonl", - required=True, - choices=["jsonl", "csv", "text"], - help="The output file type (only jsonl is currently supported)", + help="Comma separated list of PII entity types. None implies all supported types", ) - parser.add_argument( "--text-field", type=str, @@ -184,30 +147,16 @@ def attach_args( "field but other fields can be specified such as 'url' or 'id'.", ) - parser.add_argument( - "--batch-size", - type=int, - default=2000, - help="The batch size for processing multiple texts together.", - ) - - parser.add_argument( - "--output-data-dir", - type=str, - default=None, - required=True, - help="The output directory to where redacted documents will be written.", - ) - return parser def console_script(): - arguments = add_distributed_args(attach_args()).parse_args() - client = get_client(**parse_client_args(arguments)) - if not arguments.n_workers: - arguments.n_workers = len(client.scheduler_info()["workers"]) - main(arguments) + parser = attach_args() + args = ArgumentHelper(parser).add_distributed_args().parse_args() + client = get_client(**ArgumentHelper.parse_client_args(args)) + if not args.n_workers: + args.n_workers = len(client.scheduler_info()["workers"]) + main(args) if __name__ == "__main__": diff --git a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py index 1c461777..01baee05 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py +++ b/nemo_curator/scripts/fuzzy_deduplication/compute_minhashes.py @@ -25,7 +25,7 @@ ) from nemo_curator.utils.file_utils import get_all_files_paths_under from nemo_curator.utils.fuzzy_dedup_utils.io_utils import strip_trailing_sep -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def pre_imports(): @@ -42,7 +42,7 @@ def main(args): assert args.device == "gpu" args.set_torch_to_use_rmm = False - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) logger.info(f"Client Created {client}") client.run(pre_imports) logger.info("Pre imports complete") @@ -106,18 +106,19 @@ def main(args): def attach_args(parser=None): - description = """Computes minhash signatures from an input directory of documents - contained within jsonl files. For each document a dataframe of document-ids - -minhash signatures is created. This dataframe is written to file after processing - """ if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Computes minhash signatures from an input directory of documents + contained within jsonl files. For each document a dataframe of document-ids + -minhash signatures is created. This dataframe is written to file after processing + """ + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) - parser.add_argument( - "--minhash-length", - type=int, - default=260, - help="The number of minhashes to compute for each document.", + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_minhash_length() + argumentHelper.add_arg_seed( + help="Random seed used for intializing the hash " + "functions used to compute the MinHashes" ) parser.add_argument( "--char-ngram", @@ -125,7 +126,7 @@ def attach_args(parser=None): default=5, help="The number of consecutive characters to include in a sliding " "window when creating the document shingles for computing " - "MinHash signatures.", + "minhash signatures.", ) parser.add_argument( "--hash-bytes", @@ -134,13 +135,6 @@ def attach_args(parser=None): help="Number of bytes per computed minhash " "(default is an unsigned 32-bit integer)", ) - parser.add_argument( - "--seed", - type=int, - default=42, - help="Random seed used for intializing the hash " - "functions used to compute the MinHashes", - ) parser.add_argument( "--output-minhash-dir", type=str, @@ -149,6 +143,7 @@ def attach_args(parser=None): "Each file is a parquet file that contains two series, the document ids, " "and a series of lists, each list denoting the minhash signature for that document id.", ) + return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py index f8da010e..7ec8c5ab 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py +++ b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py @@ -17,7 +17,7 @@ from nemo_curator.modules.fuzzy_dedup import ConnectedComponents from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -34,7 +34,7 @@ def main(args): args.set_torch_to_use_rmm = False args.enable_spilling = True - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) components_stage = ConnectedComponents( cache_dir=args.cache_dir, @@ -49,24 +49,22 @@ def main(args): def attach_args(parser=None): - description = """Computes connected component""" if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Computes connected component""" + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_output_dir() parser.add_argument( - "--jaccard-pairs-path", - type=str, - help="The directory containing the jaccard results", - ) - parser.add_argument( - "--output-dir", + "--cache-dir", type=str, - help="The output directory to write results to", + help="The cache directory to write intermediate results to", ) parser.add_argument( - "--cache-dir", + "--jaccard-pairs-path", type=str, - help="The cache directory to write intermediate results to", + help="The directory containing the jaccard results", ) parser.add_argument( "--jaccard-threshold", @@ -75,6 +73,7 @@ def attach_args(parser=None): help="Jaccard threshold below which we don't consider documents" " to be duplicate", ) + return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/jaccard_compute.py b/nemo_curator/scripts/fuzzy_deduplication/jaccard_compute.py index cb645184..138b4e9b 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/jaccard_compute.py +++ b/nemo_curator/scripts/fuzzy_deduplication/jaccard_compute.py @@ -17,7 +17,7 @@ from nemo_curator.modules.fuzzy_dedup import JaccardSimilarity from nemo_curator.utils.distributed_utils import get_client, get_num_workers -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -31,7 +31,7 @@ def main(args): OUTPUT_PATH, "jaccard_similarity_results.parquet" ) args.enable_spilling = True - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) @@ -55,26 +55,23 @@ def main(args): def attach_args(parser=None): - description = """Computes jaccard similarity""" if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Computes jaccard similarity""" + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) - parser.add_argument( - "--shuffled-docs-path", - type=str, - help="The directory containing the shuffled documents", - ) - parser.add_argument( - "--output-dir", - type=str, - help="The output directory to write results to", - ) + ArgumentHelper(parser).add_arg_output_dir() parser.add_argument( "--ngram-size", type=int, default=5, help="Size of ngram to use during jaccard similarity", ) + parser.add_argument( + "--shuffled-docs-path", + type=str, + help="The directory containing the shuffled documents", + ) + return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py index 39911de5..e46f36e2 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py +++ b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py @@ -20,7 +20,7 @@ from nemo_curator.utils.fuzzy_dedup_utils.io_utils import ( get_text_ddf_from_json_path_with_blocksize, ) -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def func(): @@ -35,7 +35,7 @@ def main(args): OUTPUT_PATH = args.output_dir output_shuffled_docs_path = os.path.join(OUTPUT_PATH, "shuffled_docs.parquet") - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) client.run(func) print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) @@ -75,41 +75,34 @@ def main(args): def attach_args(parser=None): - description = """Shuffles input text documents based on the given bucket - map. The output is a partitioned parquet dataset with the documents - shuffled by buckets - """ if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Shuffles input text documents based on the given bucket + map. The output is a partitioned parquet dataset with the documents + shuffled by buckets + """ + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_meta() + argumentHelper.add_arg_output_dir() + argumentHelper.add_arg_text_ddf_blocksize() parser.add_argument( - "--input-bucket-mapping-dir", - type=str, - help="The directory containing anchor docs with bk files", - ) - parser.add_argument( - "--input-meta", - type=str, - default=None, - help="A string formatted as a dictionary, which outlines the field names and " - "their respective data types within the JSONL input files.", - ) - parser.add_argument( - "--text-ddf-blocksize", + "--bucket-mapping-ddf-blocksize", type=int, default=256, - help="The block size for chunking jsonl files for text ddf in mb", + help="The block size for for anchor_docs_with_bk ddf in mb", ) parser.add_argument( - "--bucket-mapping-ddf-blocksize", + "--bucket-parts-per-worker", + default=8, type=int, - default=256, - help="The block size for for anchor_docs_with_bk ddf in mb", + help="The number of bucket parts to process per worker per batch", ) parser.add_argument( - "--output-dir", + "--input-bucket-mapping-dir", type=str, - help="The output directory to write results in", + help="The directory containing anchor docs with bk files", ) parser.add_argument( "--parts-per-worker", @@ -117,12 +110,6 @@ def attach_args(parser=None): type=int, help="The number of parts to process per worker per batch", ) - parser.add_argument( - "--bucket-parts-per-worker", - default=8, - type=int, - help="The number of bucket parts to process per worker per batch", - ) return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py index 07d92791..af085368 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py +++ b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py @@ -21,7 +21,7 @@ get_bucket_ddf_from_parquet_path, get_text_ddf_from_json_path_with_blocksize, ) -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def get_anchor_and_output_map_info( @@ -70,34 +70,28 @@ def get_anchor_and_output_map_info( def attach_args(parser=None): - description = """Takes the buckets generated from minhashes and uses - document length information to create a coarse mapping of mapping multiple - buckets to a logical partition by using a modified bin packing algorithm. - """ if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Takes the buckets generated from minhashes and uses + document length information to create a coarse mapping of mapping multiple + buckets to a logical partition by using a modified bin packing algorithm. + """ + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) + + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_meta() + argumentHelper.add_arg_output_dir() + argumentHelper.add_arg_text_ddf_blocksize() parser.add_argument( "--input-bucket-dir", type=str, help="The directory containing bucket information files", ) parser.add_argument( - "--input-meta", - type=str, - default=None, - help="A string formatted as a dictionary, which outlines the field names and " - "their respective data types within the JSONL input files.", - ) - parser.add_argument( - "--text-ddf-blocksize", - type=int, - default=256, - help="The block size for chunking jsonl files for text ddf in mb", - ) - parser.add_argument( - "--output-dir", + "--input-bucket-field", type=str, - help="The output directory to write results in", + default="_bucket_id", + help="Name of the column containing minhashes", ) parser.add_argument( "--shuffle-type", @@ -105,12 +99,7 @@ def attach_args(parser=None): default="tasks", help="Type of shuffle to use before writing to parquet", ) - parser.add_argument( - "--input-bucket-field", - type=str, - default="_bucket_id", - help="Name of the column containing minhashes", - ) + return parser @@ -165,7 +154,7 @@ def main(args): output_anchor_docs_with_bk_path = os.path.join( OUTPUT_PATH, "anchor_docs_with_bk.parquet" ) - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) print(f"Num Workers = {get_num_workers(client)}", flush=True) print("Connected to dask cluster", flush=True) print("Running jaccard map buckets script", flush=True) diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index e63ce2d5..d7575835 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -24,7 +24,7 @@ from nemo_curator.log import create_logger from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int -from nemo_curator.utils.script_utils import parse_client_args, parse_gpu_dedup_args +from nemo_curator.utils.script_utils import ArgumentHelper def pre_imports(): @@ -40,7 +40,7 @@ def main(args): assert args.device == "gpu" args.set_torch_to_use_rmm = False - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) logger.info(f"Client Created {client}") client.run(pre_imports) logger.info("Pre imports complete") @@ -78,18 +78,21 @@ def main(args): def attach_args(parser=None): - description = """Compute buckets from existing minhashes and writes the output - to files. Each row corresponding to a document-id followed by the columns - denoting the bucket id's that document belongs to. - """ if not parser: - parser = parse_gpu_dedup_args(description=description) + description = """Compute buckets from existing minhashes and writes the output + to files. Each row corresponding to a document-id followed by the columns + denoting the bucket id's that document belongs to. + """ + parser = ArgumentHelper.parse_gpu_dedup_args(description=description) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_minhash_length() parser.add_argument( - "--minhash-length", + "--buckets-per-shuffle", type=int, - default=260, - help="The minhash signature length of each input document", + required=True, + help="Number of buckets to shuffle per batch", ) parser.add_argument( "--input-minhash-field", @@ -103,12 +106,6 @@ def attach_args(parser=None): default=20, help="The number of minhashes to compute for each document.", ) - parser.add_argument( - "--buckets-per-shuffle", - type=int, - required=True, - help="Number of buckets to shuffle per batch", - ) parser.add_argument( "--output-bucket-dir", type=str, diff --git a/nemo_curator/scripts/get_common_crawl_urls.py b/nemo_curator/scripts/get_common_crawl_urls.py index 1e9c6455..651738b2 100644 --- a/nemo_curator/scripts/get_common_crawl_urls.py +++ b/nemo_curator/scripts/get_common_crawl_urls.py @@ -15,7 +15,7 @@ import argparse from nemo_curator.utils.download_utils import get_common_crawl_urls -from nemo_curator.utils.script_utils import attach_bool_arg +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -51,6 +51,14 @@ def attach_args( "file to create the URL. By default this value is " " 'https://data.commoncrawl.org'", ) + parser.add_argument( + "--ending-snapshot", + type=str, + default="2020-50", + help="The last snapshot for which WARC urls will be retrieved. " + "Snapshots must be specified by YYYY-WeekNumber " + "(e.g., '2020-50' or '2021-04')", + ) parser.add_argument( "--cc-index-prefix", type=str, @@ -58,6 +66,15 @@ def attach_args( help="The prefix of the URL to the Common Crawl index. " "By default this value is 'https://index.commoncrawl.org'", ) + ArgumentHelper.attach_bool_arg( + parser, + "cc-news", + help="Specify --cc-news in order to download WARC URLs for " + "the CC-NEWS dataset instead of the CC-MAIN datasets. If this " + "is specified, then it is assumed that the format for the start " + "and end snapshots is 'YYYY-MM' (Year-Month). All WARC URLs between " + "the specified years and months will be download", + ) parser.add_argument( "--output-warc-url-file", type=str, @@ -76,23 +93,7 @@ def attach_args( "(specified with the '--cc-news' flag) this changes to " "Year-Month (YYYY-MM)", ) - parser.add_argument( - "--ending-snapshot", - type=str, - default="2020-50", - help="The last snapshot for which WARC urls will be retrieved. " - "Snapshots must be specified by YYYY-WeekNumber " - "(e.g., '2020-50' or '2021-04')", - ) - attach_bool_arg( - parser, - "cc-news", - help_str="Specify --cc-news in order to download WARC URLs for " - "the CC-NEWS dataset instead of the CC-MAIN datasets. If this " - "is specified, then it is assumed that the format for the start " - "and end snapshots is 'YYYY-MM' (Year-Month). All WARC URLs between " - "the specified years and months will be download", - ) + return parser diff --git a/nemo_curator/scripts/get_wikipedia_urls.py b/nemo_curator/scripts/get_wikipedia_urls.py index 8ccb4f40..0f45afe8 100644 --- a/nemo_curator/scripts/get_wikipedia_urls.py +++ b/nemo_curator/scripts/get_wikipedia_urls.py @@ -15,6 +15,7 @@ import argparse from nemo_curator.utils.download_utils import get_wikipedia_urls +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -35,11 +36,15 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + ArgumentHelper(parser).add_arg_language( + help="Desired language of the Wikipedia dump" + ) parser.add_argument( - "--language", + "--output-url-file", type=str, - default="en", - help="Desired language of the Wikipedia dump", + default="wikipedia_urls_latest.txt", + help="The output file to which the urls containing " + "the latest dump data will be written", ) parser.add_argument( "--wikidumps-index-baseurl", @@ -47,13 +52,7 @@ def attach_args( default="https://dumps.wikimedia.org", help="The base url for all Wikipedia dumps", ) - parser.add_argument( - "--output-url-file", - type=str, - default="wikipedia_urls_latest.txt", - help="The output file to which the urls containing " - "the latest dump data will be written", - ) + return parser diff --git a/nemo_curator/scripts/make_data_shards.py b/nemo_curator/scripts/make_data_shards.py index e4160e99..e1344f04 100644 --- a/nemo_curator/scripts/make_data_shards.py +++ b/nemo_curator/scripts/make_data_shards.py @@ -16,11 +16,11 @@ from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.file_utils import reshard_jsonl -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) reshard_jsonl( args.input_data_dir, @@ -46,26 +46,29 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_distributed_args() parser.add_argument( - "--input-data-dir", + "--output-file-size", type=str, - default=None, - required=True, - help="Input directory consisting of .jsonl file(s)", + default="100M", + help="Approximate size of output files. Must specify with a string and " + "with the unit K, M or G for kilo, mega or gigabytes", ) parser.add_argument( "--output-resharded-dir", type=str, default=None, required=True, - help="Output directory to where the sharded " ".jsonl files will be written", + help="Output directory to where the sharded .jsonl files will be written", ) parser.add_argument( - "--output-file-size", + "--prefix", type=str, - default="100M", - help="Approximate size of output files. Must specify with a string and " - "with the unit K, M or G for kilo, mega or gigabytes", + default="", + help="Prefix to use to prepend to output file number", ) parser.add_argument( "--start-index", @@ -73,14 +76,6 @@ def attach_args( default=0, help="Starting index for naming the output files", ) - parser.add_argument( - "--prefix", - type=str, - default="", - help="Prefix to use to prepend to output file number", - ) - - parser = add_distributed_args(parser) return parser diff --git a/nemo_curator/scripts/prepare_fasttext_training_data.py b/nemo_curator/scripts/prepare_fasttext_training_data.py index 551b3646..e1abcdf4 100644 --- a/nemo_curator/scripts/prepare_fasttext_training_data.py +++ b/nemo_curator/scripts/prepare_fasttext_training_data.py @@ -20,7 +20,7 @@ from nemo_curator.modules import Modify from nemo_curator.utils.distributed_utils import get_client, read_data from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def sample_rows(df, n, seed): @@ -30,7 +30,7 @@ def sample_rows(df, n, seed): def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Get local path files = list(get_all_files_paths_under(args.input_data_dir)) raw_data = read_data(files, file_type="jsonl", backend="pandas") @@ -67,20 +67,18 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_log_dir(default="./log/prepare_filter_data") + argumentHelper.add_arg_output_train_file( + help="The output file containing prepared samples to train a " + "skip-gram classifier with FastText" ) - parser.add_argument( - "--input-local-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files. " - "Use this argument when a distributed file system is not available.", + argumentHelper.add_arg_seed( + help="The random seed to use for sampling from the dataset" ) + argumentHelper.add_distributed_args() parser.add_argument( "--input-json-field", type=str, @@ -89,28 +87,6 @@ def attach_args( "operate. By default, the filter will operate on the 'text' " "field but other fields can be specified such as 'url' or 'id'.", ) - parser.add_argument( - "--output-num-samples", - type=int, - default=None, - required=True, - help="The number of documents to randomly sample from the dataset and" - " use as training and validation samples to train the" - " skip-gram classifier", - ) - parser.add_argument( - "--output-train-file", - type=str, - default=None, - help="The output file containing prepared samples to train a " - "skip-gram classifier with FastText", - ) - parser.add_argument( - "--seed", - type=int, - default=42, - help="The random seed to use for sampling from the dataset", - ) parser.add_argument( "--label", type=str, @@ -120,15 +96,6 @@ def attach_args( "in the output file. For example '__label__hq' could be " "used for the high-quality (positive) samples", ) - parser.add_argument( - "--log-dir", - type=str, - default="./log/prepare_filter_data", - help="The output log directory where node and local" - " ranks will write their respective log files", - ) - - parser = add_distributed_args(parser) return parser diff --git a/nemo_curator/scripts/prepare_task_data.py b/nemo_curator/scripts/prepare_task_data.py index 45ed800b..7455ce66 100644 --- a/nemo_curator/scripts/prepare_task_data.py +++ b/nemo_curator/scripts/prepare_task_data.py @@ -20,11 +20,11 @@ import nemo_curator from nemo_curator.tasks.downstream_task import import_task from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Read in config file with open(args.task_config_file, "r") as config_file: task_params = yaml.load(config_file, Loader=yaml.FullLoader) @@ -57,15 +57,7 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--task-config-file", - type=str, - default=None, - required=True, - help="YAML configuration file that contains task information. " - "YAML files for already implemented tasks can be found in the config " - "directory that is located in the root directory of this repository.", - ) + ArgumentHelper(parser).add_distributed_args() parser.add_argument( "--output-task-ngrams", type=str, @@ -75,8 +67,15 @@ def attach_args( "are the frequencies of which the n-grams occurr within a " "training dataset (they are initialized to zero within this program)", ) - - parser = add_distributed_args(parser) + parser.add_argument( + "--task-config-file", + type=str, + default=None, + required=True, + help="YAML configuration file that contains task information. " + "YAML files for already implemented tasks can be found in the config " + "directory that is located in the root directory of this repository.", + ) return parser diff --git a/nemo_curator/scripts/quality_classifier_inference.py b/nemo_curator/scripts/quality_classifier_inference.py index e538b032..7ac0b699 100644 --- a/nemo_curator/scripts/quality_classifier_inference.py +++ b/nemo_curator/scripts/quality_classifier_inference.py @@ -21,28 +21,11 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import get_remaining_files -from nemo_curator.utils.script_utils import ( - parse_client_args, - parse_distributed_classifier_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper warnings.filterwarnings("ignore") -def add_quality_model_specific_args(parser): - """ - This function adds a command line argument for the number of labels. - - Args: - parser: An argparse ArgumentParser object. - Returns: - An argparse ArgumentParser with 1 additional argument. - - """ - parser.add_argument("--num-labels", type=int, default=3) - return parser - - def get_labels(num_labels): """ This function returns a list of quality labels, depending on how many labels the user expects. @@ -61,14 +44,14 @@ def get_labels(num_labels): def main(): - parser = parse_distributed_classifier_args() - parser = add_quality_model_specific_args(parser) + parser = ArgumentHelper.parse_distributed_classifier_args() + parser.add_argument("--num-labels", type=int, default=3) args = parser.parse_args() labels = get_labels(args.num_labels) print(f"Arguments parsed = {args}", flush=True) max_chars = 6000 - client_args = parse_client_args(args) + client_args = ArgumentHelper.parse_client_args(args) client_args["cluster_type"] = "gpu" client = get_client(**client_args) print("Starting quality classifier inference", flush=True) diff --git a/nemo_curator/scripts/remove_matching_ngrams.py b/nemo_curator/scripts/remove_matching_ngrams.py index 859a6902..8238cc26 100644 --- a/nemo_curator/scripts/remove_matching_ngrams.py +++ b/nemo_curator/scripts/remove_matching_ngrams.py @@ -23,11 +23,11 @@ get_all_files_paths_under, get_batched_files, ) -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) output_tdd_dir = expand_outdir_and_mkdir(args.output_task_deduped_dir) output_rm_doc_dir = None @@ -95,20 +95,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", - ) - parser.add_argument( - "--input-text-field", - type=str, - default="text", - help="The name of the field within each datapoint object of the input " - "file that contains the text.", - ) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_batch_size() + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_input_text_field() + argumentHelper.add_arg_output_file_type() + argumentHelper.add_distributed_args() parser.add_argument( "--input-matched-ngrams", type=str, @@ -117,23 +111,6 @@ def attach_args( help="Input dictionary (.pkl file), that contains matched " "n-gram data from the find_matching_ngrams code", ) - parser.add_argument( - "--output-task-deduped-dir", - type=str, - default=None, - required=True, - help="Output directory to where task-deduplicated (split) " - "documents will be written", - ) - parser.add_argument( - "--output-removed-doc-dir", - type=str, - default=None, - help="Output directory to where removed documents will be written. " - "Documents will be removed from the corpus if they are split more " - "than --max-document-splits number of times, or if the user specifies " - "that they be removed via the flag, --remove-split-docs", - ) parser.add_argument( "--match-threshold", type=int, @@ -152,28 +129,23 @@ def attach_args( "--max-document-splits number of times", ) parser.add_argument( - "--input-file-type", + "--output-removed-doc-dir", type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", + default=None, + help="Output directory to where removed documents will be written. " + "Documents will be removed from the corpus if they are split more " + "than --max-document-splits number of times, or if the user specifies " + "that they be removed via the flag, --remove-split-docs", ) parser.add_argument( - "--output-file-type", + "--output-task-deduped-dir", type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - parser.add_argument( - "--batch-size", - type=int, - default=64, - help="Number of files to read into memory at a time.", + default=None, + required=True, + help="Output directory to where task-deduplicated (split) " + "documents will be written", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/separate_by_metadata.py b/nemo_curator/scripts/separate_by_metadata.py index 39f9ce68..e8ff4f92 100644 --- a/nemo_curator/scripts/separate_by_metadata.py +++ b/nemo_curator/scripts/separate_by_metadata.py @@ -22,15 +22,11 @@ get_all_files_paths_under, separate_by_metadata, ) -from nemo_curator.utils.script_utils import ( - add_distributed_args, - attach_bool_arg, - parse_client_args, -) +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) files = get_all_files_paths_under(args.input_data_dir) input_data = read_data( @@ -67,13 +63,17 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_output_data_dir( + help="The output directory to where the metadata-separated files " + "will be written. Each file will be written to its respective " + "metadata directory that is a sub-directory of this directory" ) + argumentHelper.add_arg_output_file_type() + argumentHelper.add_distributed_args() parser.add_argument( "--input-metadata-field", type=str, @@ -87,46 +87,21 @@ def attach_args( help="Output json file containing the frequency of documents " "that occur for a particular metadata.", ) - parser.add_argument( - "--output-data-dir", - type=str, - required=True, - help="The output directory to where the metadata-separated " - "files will be written. Each file will be written to its " - "respective metadata directory that is a sub-directory " - "of this directory", + ArgumentHelper.attach_bool_arg( + parser, + "remove-input-dir", + default=False, + help="Specify '--remove-input-dir' to remove the original " + "input directory. This is false by default.", ) - attach_bool_arg( + ArgumentHelper.attach_bool_arg( parser, "remove-metadata-field", default=False, - help_str="Option of whether to remove the metadata field " + help="Option of whether to remove the metadata field " "after filtering. Useful only in the case in which one metadata " "is desired to be separated from the others", ) - attach_bool_arg( - parser, - "remove-input-dir", - default=False, - help_str="Specify '--remove-input-dir' to remove the original " - "input directory. This is false by default.", - ) - parser.add_argument( - "--input-file-type", - type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - parser.add_argument( - "--output-file-type", - type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - - parser = add_distributed_args(parser) return parser diff --git a/nemo_curator/scripts/text_cleaning.py b/nemo_curator/scripts/text_cleaning.py index 7ffb08e5..73b6fc23 100644 --- a/nemo_curator/scripts/text_cleaning.py +++ b/nemo_curator/scripts/text_cleaning.py @@ -19,11 +19,11 @@ from nemo_curator.modifiers import UnicodeReformatter from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_batched_files -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) # Make the output directories output_clean_dir = expand_outdir_and_mkdir(args.output_clean_dir) @@ -71,50 +71,21 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): - parser.add_argument( - "--input-data-dir", - type=str, - default=None, - help="Input directory consisting of .jsonl files that are accessible " - "to all nodes. Use this for a distributed file system", - ) - parser.add_argument( - "--input-text-field", - type=str, - default="text", - help="The name of the field within each datapoint object of the input " - "file that contains the text.", - ) - parser.add_argument( - "--input-file-type", - type=str, - default="jsonl", - help="File type of the dataset to be read in. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_batch_size() + argumentHelper.add_arg_input_data_dir() + argumentHelper.add_arg_input_file_type() + argumentHelper.add_arg_input_text_field() + argumentHelper.add_arg_output_file_type() + argumentHelper.add_distributed_args() parser.add_argument( "--output-clean-dir", type=str, - default=None, required=True, - help="The output directory to where the cleaned " "jsonl files will be written", - ) - parser.add_argument( - "--output-file-type", - type=str, - default="jsonl", - help="File type the dataset will be written to. Supported file formats" - " include 'jsonl' (default), 'pickle', or 'parquet'.", - ) - parser.add_argument( - "--batch-size", - type=int, - default=64, - help="Number of files to read into memory at a time.", + help="The output directory to where the cleaned jsonl files will be written", ) - parser = add_distributed_args(parser) - return parser diff --git a/nemo_curator/scripts/train_fasttext.py b/nemo_curator/scripts/train_fasttext.py index 849949af..f773dfbd 100644 --- a/nemo_curator/scripts/train_fasttext.py +++ b/nemo_curator/scripts/train_fasttext.py @@ -22,6 +22,7 @@ from tqdm import tqdm from nemo_curator.utils.file_utils import get_all_files_paths_under +from nemo_curator.utils.script_utils import ArgumentHelper def main(args): @@ -119,6 +120,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) ): + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_arg_seed(default=1992) + argumentHelper.add_arg_output_train_file( + help="The concatenated, shuffled samples used " + "to train the skip-gram classifier", + default="./fasttext_samples.train", + ) parser.add_argument( "--fasttext-files-dir", type=str, @@ -135,17 +144,30 @@ def attach_args( "when preparing the data", ) parser.add_argument( - "--seed", + "--learning-rate", + type=float, + default=0.1, + help="The learning rate used to train the classifier", + ) + parser.add_argument( + "--num-epochs", type=int, - default=1992, - help="The seed used for randomly shuffling the documents", + default=5, + help="Number of epochs used to train the classifier", ) parser.add_argument( - "--output-train-file", + "--output-model", type=str, - default="./fasttext_samples.train", - help="The concatenated, shuffled samples used " - "to train the skip-gram classifier", + default=None, + required=True, + help="The output trained skip-gram classifier written as a FastText model", + ) + parser.add_argument( + "--output-predictions", + type=str, + default=None, + help="The output predictions on the validation data. If a file " + "is not specified, the predictions are not written to file", ) parser.add_argument( "--output-validation-file", @@ -160,13 +182,6 @@ def attach_args( default=0.9, help="The training validation split", ) - parser.add_argument( - "--output-model", - type=str, - default=None, - required=True, - help="The output trained skip-gram classifier written " "as a FastText model", - ) parser.add_argument( "--wordNgrams", type=int, @@ -174,32 +189,13 @@ def attach_args( help="The size of the word n-gram used to train the classifier " "(default is bigram)", ) - parser.add_argument( - "--learning-rate", - type=float, - default=0.1, - help="The learning rate used to train the classifier", - ) - parser.add_argument( - "--num-epochs", - type=int, - default=5, - help="Number of epochs used to train the classifier", - ) parser.add_argument( "--word-vector-dim", type=int, default=100, help="Size of word vectors to be computed by the model", ) - parser.add_argument( - "--output-predictions", - type=str, - default=None, - help="The output predictions on the validation data. " - "If a file is not specified, the predictions are not " - "written to file", - ) + return parser diff --git a/nemo_curator/scripts/verify_classification_results.py b/nemo_curator/scripts/verify_classification_results.py index 3a639349..f13f06d8 100644 --- a/nemo_curator/scripts/verify_classification_results.py +++ b/nemo_curator/scripts/verify_classification_results.py @@ -19,6 +19,8 @@ import pandas as pd +from nemo_curator.utils.script_utils import ArgumentHelper + def parse_args(): """ @@ -30,12 +32,14 @@ def parse_args(): """ parser = argparse.ArgumentParser(description="Run verification") + ArgumentHelper(parser).add_arg_input_meta() parser.add_argument( - "--results_file_path", + "--expected_pred_column", type=str, - required=True, - help="The path of the input files", + default="pred", + help="The prediction column name for the expected_result file", ) + parser.add_argument( "--expected_results_file_path", type=str, @@ -43,23 +47,16 @@ def parse_args(): help="The path of the expected_result file", ) parser.add_argument( - "--results_pred_column", + "--results_file_path", type=str, - default="pred", - help="The prediction column name for the input files", + required=True, + help="The path of the input files", ) parser.add_argument( - "--expected_pred_column", + "--results_pred_column", type=str, default="pred", - help="The prediction column name for the expected_result file", - ) - parser.add_argument( - "--input-meta", - type=str, - default=None, - help="A string formatted as a dictionary, which outlines the field names and " - "their respective data types within the JSONL input files.", + help="The prediction column name for the input files", ) return parser.parse_args() diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index 62e9b579..e6f12620 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -60,7 +60,7 @@ def _read_json_func( def get_text_ddf_from_json_path_with_blocksize( - input_data_paths, num_files, blocksize, id_column, text_column, input_meta + input_data_paths, num_files, blocksize, id_column, text_column, input_meta=None ): data_paths = [ entry.path for data_path in input_data_paths for entry in os.scandir(data_path) diff --git a/nemo_curator/utils/script_utils.py b/nemo_curator/utils/script_utils.py index a26883d4..32582daf 100644 --- a/nemo_curator/utils/script_utils.py +++ b/nemo_curator/utils/script_utils.py @@ -13,335 +13,486 @@ # limitations under the License. import argparse import os -from itertools import islice - - -def attach_bool_arg(parser, flag_name, default=False, help_str=None): - attr_name = flag_name.replace("-", "_") - parser.add_argument( - "--{}".format(flag_name), - dest=attr_name, - action="store_true", - help=flag_name.replace("-", " ") if help_str is None else help_str, - ) - parser.add_argument( - "--no-{}".format(flag_name), - dest=attr_name, - action="store_false", - help=flag_name.replace("-", " ") if help_str is None else help_str, - ) - parser.set_defaults(**{attr_name: default}) - - -def add_distributed_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: - """ - Adds default set of arguments that are needed for Dask cluster setup - """ - parser.add_argument( - "--scheduler-address", - type=str, - default=None, - help="Address to the scheduler of a created dask cluster. If not provided" - "a single node Cluster will be started.", - ) - parser.add_argument( - "--scheduler-file", - type=str, - default=None, - help="Path to the scheduler file of a created dask cluster. If not provided" - " a single node Cluster will be started.", - ) - parser.add_argument( - "--n-workers", - type=int, - default=os.cpu_count(), - help="The number of workers to run in total on the Dask CPU cluster", - ) - parser.add_argument( - "--threads-per-worker", - type=int, - default=1, - help="The number of threads ot launch per worker on the Dask CPU cluster. Usually best set at 1 due to the GIL.", - ) - parser.add_argument( - "--rmm-pool-size", - type=str, - default=None, - help="Initial pool size to use for the RMM Pool Memory allocator" - "Note: This only applies to the LocalCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-rmm-pool-size", # noqa: E501 - ) - parser.add_argument( - "--protocol", - type=str, - default="tcp", - help="Protcol to use for dask cluster" - "Note: This only applies to the localCUDACluster. If providing an user created " - "cluster refer to" - "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-protocol", # noqa: E501 - ) - parser.add_argument( - "--nvlink-only", - action="store_true", - help="Start a local cluster with only NVLink enabled." - "Only applicable when protocol=ucx and no scheduler file/address is specified", - ) - parser.add_argument( - "--files-per-partition", - type=int, - default=2, - help="Number of jsonl files to combine into single partition", - ) - parser.add_argument( - "--num-files", - type=int, - default=None, - help="Upper limit on the number of json files to process", - ) - parser.add_argument( - "--device", - type=str, - default="cpu", - help="Device to run the script on. Either 'cpu' or 'gpu'.", - ) - - return parser - - -def parse_gpu_dedup_args( - description="Default gpu dedup nemo_curator argument parser", -) -> argparse.ArgumentParser: - """ - Adds default set of arguments that are common to multiple stages - of the pipeline - """ - parser = argparse.ArgumentParser( - description, - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser = add_distributed_args(parser) - - # Set default device to GPU for dedup - parser.set_defaults(device="gpu") - parser.add_argument( - "--input-data-dirs", - type=str, - nargs="+", - default=None, - required=False, - help="Input directories consisting of .jsonl files that are accessible " - "to all nodes. This path must be accessible by all machines in the cluster", - ) - parser.add_argument( - "--input-json-text-field", - type=str, - default="text", - help="The name of the field within each json object of the jsonl " - "file that contains the text from which minhashes will be computed. ", - ) - parser.add_argument( - "--input-json-id-field", - type=str, - default="adlr_id", - help="The name of the field within each json object of the jsonl " - "file that assigns a unqiue ID to each document. " - "Can be created by running the script " - "'./prospector/add_id.py' which adds the field 'adlr_id' " - "to the documents in a distributed fashion", - ) - parser.add_argument( - "--log-dir", - type=str, - default="./logs/", - help="The output log directory where node and local", - ) - parser.add_argument( - "--profile-path", - type=str, - default=None, - help="Path to save dask profile", - ) - return parser - - -def parse_client_args(args: argparse.Namespace): - """ - Extracts relevant arguments from an argparse namespace to pass to get_client - """ - relevant_args = [ - "scheduler_address", - "scheduler_file", - "n_workers", - "threads_per_worker", - "nvlink_only", - "protocol", - "rmm_pool_size", - "enable_spilling", - "set_torch_to_use_rmm", - ] - dict_args = vars(args) - - parsed_args = {arg: dict_args[arg] for arg in relevant_args if arg in dict_args} - if "device" in dict_args: - parsed_args["cluster_type"] = dict_args["device"] - - return parsed_args - - -def parse_distributed_classifier_args( - description="Default distributed classifier argument parser", -) -> argparse.ArgumentParser: + + +class ArgumentHelper: """ - Adds default set of arguments that are common to multiple stages - of the pipeline + A helper class to add common arguments to an argparse.ArgumentParser instance. """ - parser = argparse.ArgumentParser( - description, - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser = add_distributed_args(parser) - # Set low default RMM pool size for classifier - # to allow pytorch to grow its memory usage - # by default - parser.set_defaults(rmm_pool_size="512MB") - parser.add_argument( - "--input-data-dir", - type=str, - help="The path of the input files", - required=True, - ) - parser.add_argument( - "--output-data-dir", - type=str, - help="The path of the output files", - required=True, - ) - parser.add_argument( - "--model-path", - type=str, - help="The path to the model file", - required=True, - ) - parser.add_argument( - "--input-file-type", - type=str, - help="The type of the input files", - required=True, - ) - parser.add_argument( - "--output-file-type", - type=str, - default="jsonl", - help="The type of the output files", - required=False, - ) - parser.add_argument( - "--batch-size", - type=int, - default=128, - help="The batch size to be used for inference", - ) - attach_bool_arg( - parser, "autocast", default=True, help_str="Whether to use autocast or not" - ) - attach_bool_arg( - parser, - "enable-spilling", - default=True, - help_str="Whether to enable spilling or not", - ) - - # Setting to False makes it more stable for long running jobs - # possibly because of memory fragmentation - attach_bool_arg( - parser, - "set-torch-to-use-rmm", - default=False, - help_str="Whether to set torch to use RMM or not", - ) - - return parser - - -def chunk_list(lst, nchnks): - nitem = len(lst) - splits = splitnum(nitem, nchnks) - beg, end = 0, splits[0] - for i in range(nchnks): - if i == nchnks - 1: - yield lst[beg:] - else: - yield lst[beg:end] - beg = end - end += splits[i + 1] - - -def get_ranges(n, nchnks): - splits = splitnum(n, nchnks) - beg, end = 0, splits[0] - for i in range(nchnks): - if i == nchnks - 1: - yield beg, n - else: - yield beg, end - beg = end - end += splits[i + 1] - - -def chunk_list_lean(lst, nchnks): - nitem = len(lst) - splits = splitnum(nitem, nchnks) - # Outer loop over chunks - for i in range(nchnks): - # Slice thie list - yield lst[0 : splits[i]] - # Remove the chunk from the total list - del lst[0 : splits[i]] - - -def chunk_dict(din, nchnks): - nitem = len(din) - splits = splitnum(nitem, nchnks) - beg, end = 0, splits[0] - # Outer loop over chunks - for i in range(nchnks): - it, out = iter(din), {} - # Slice the dictionary - for k in islice(it, beg, end): - out[k] = din[k] - if i == nchnks - 1: - yield out - else: - beg = end - end += splits[i + 1] - yield out - - -def chunk_dict_lean(din, nchnks): - nitem = len(din) - splits = splitnum(nitem, nchnks) - # Outer loop over chunks - for i in range(nchnks): - it = iter(din) - out = {} - # Slice the dictionary - for k in islice(it, splits[i]): - out[k] = din[k] - yield out - # Clear out chunked entries - for k in out.keys(): - del din[k] - - -def splitnum(num, div): - """Splits a number into nearly even parts""" - splits = [] - igr, rem = divmod(num, div) - for i in range(div): - splits.append(igr) - for i in range(rem): - splits[i] += 1 - - return splits + def __init__(self, parser: argparse.ArgumentParser): + self.parser = parser + + @staticmethod + def attach_bool_arg( + parser: argparse.ArgumentParser, + flag_name: str, + default: bool = False, + help: str = None, + ): + attr_name = flag_name.replace("-", "_") + help = flag_name.replace("-", " ") if help is None else help + parser.add_argument( + "--{}".format(flag_name), + dest=attr_name, + action="store_true", + help=help, + ) + parser.add_argument( + "--no-{}".format(flag_name), + dest=attr_name, + action="store_false", + help=help, + ) + + parser.set_defaults(**{attr_name: default}) + + def add_arg_batch_size( + self, + default: int = 64, + help: str = "Number of files to read into memory at a time.", + ): + self.parser.add_argument( + "--batch-size", + type=int, + default=default, + help=help, + ) + + def add_arg_device(self): + self.parser.add_argument( + "--device", + type=str, + default="gpu", + help="Device to run the script on. Either 'cpu' or 'gpu'.", + ) + + def add_arg_enable_spilling(self): + self.parser.add_argument("--enable-spilling", action="store_true") + + def add_arg_language(self, help: str): + self.parser.add_argument( + "--language", + type=str, + default="en", + help=help, + ) + + def add_arg_log_dir(self, default: str): + self.parser.add_argument( + "--log-dir", + type=str, + default=default, + help="The output log directory where node and local" + " ranks will write their respective log files", + ) + + def add_arg_input_data_dir( + self, + help: str = "Input directory consisting of .jsonl files that are accessible " + "to all nodes. Use this for a distributed file system", + ): + self.parser.add_argument( + "--input-data-dir", + type=str, + default=None, + help=help, + ) + + def add_arg_input_file_type( + self, + choices=None, + help="File type of the dataset to be read in. Supported file formats " + "include 'jsonl' (default), 'pickle', or 'parquet'.", + ): + self.parser.add_argument( + "--input-file-type", + type=str, + default="jsonl", + choices=choices, + help=help, + ) + + def add_arg_input_local_data_dir(self): + self.parser.add_argument( + "--input-local-data-dir", + type=str, + default=None, + help="Input directory consisting of dataset files. " + "Use this argument when a distributed file system is not available.", + ) + + def add_arg_input_meta(self): + self.parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A string formatted as a dictionary, which outlines the field names and " + "their respective data types within the JSONL input files.", + ) + + def add_arg_input_text_field(self): + self.parser.add_argument( + "--input-text-field", + type=str, + default="text", + help="The name of the field within each datapoint object of the input " + "file that contains the text.", + ) + + def add_arg_minhash_length(self): + self.parser.add_argument( + "--minhash-length", + type=int, + default=260, + help="The minhash signature length of each input document", + ) + + def add_arg_nvlink_only(self): + self.parser.add_argument( + "--nvlink-only", + action="store_true", + help="Start a local cluster with only NVLink enabled." + "Only applicable when protocol=ucx and no scheduler file/address is specified", + ) + + def add_arg_output_data_dir(self, help: str): + self.parser.add_argument( + "--output-data-dir", + type=str, + required=True, + help=help, + ) + + def add_arg_output_dir( + self, required=False, help: str = "The output directory to write results in" + ): + self.parser.add_argument( + "--output-dir", + type=str, + required=required, + help=help, + ) + + def add_arg_output_file_type( + self, + choices=None, + help="File type the dataset will be written to. Supported file formats " + "include 'jsonl' (default), 'pickle', or 'parquet'.", + ): + self.parser.add_argument( + "--output-file-type", + type=str, + default="jsonl", + choices=choices, + help=help, + ) + + def add_arg_output_train_file(self, help: str, default: str = None): + self.parser.add_argument( + "--output-train-file", + type=str, + default=default, + help=help, + ) + + def add_arg_protocol(self): + self.parser.add_argument( + "--protocol", + type=str, + default="ucx", + help="Protcol to use for dask cluster. " + "Note: This only applies to the localCUDACluster. If providing an user created " + "cluster refer to " + "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-protocol", # noqa: E501 + ) + + def add_arg_rmm_pool_size(self): + self.parser.add_argument( + "--rmm-pool-size", + type=str, + default="14GB", + help="Initial pool size to use for the RMM Pool Memory allocator. " + "Note: This only applies to the localCUDACluster. If providing an user created " + "cluster refer to " + "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-rmm-pool-size", # noqa: E501 + ) + + def add_arg_scheduler_address(self): + self.parser.add_argument( + "--scheduler-address", + type=str, + default=None, + help="Address to the scheduler of a created dask cluster. If not provided" + "a single node LocalCUDACluster will be started.", + ) + + def add_arg_scheduler_file(self): + self.parser.add_argument( + "--scheduler-file", + type=str, + default=None, + help="Path to the scheduler file of a created dask cluster. If not provided" + " a single node LocalCUDACluster will be started.", + ) + + def add_arg_seed( + self, + default=42, + help: str = "If specified, the random seed used for shuffling.", + ): + self.parser.add_argument( + "--seed", + type=int, + default=default, + help=help, + ) + + def add_arg_set_torch_to_use_rmm(self): + self.parser.add_argument("--set-torch-to-use-rmm", action="store_true") + + def add_arg_shuffle(self, help: str): + ArgumentHelper.attach_bool_arg( + self.parser, + "shuffle", + help=help, + ) + + def add_arg_text_ddf_blocksize(self): + self.parser.add_argument( + "--text-ddf-blocksize", + type=int, + default=256, + help="The block size for chunking jsonl files for text ddf in mb", + ) + + def add_distributed_args(self) -> argparse.ArgumentParser: + """ + Adds default set of arguments that are needed for Dask cluster setup + """ + self.parser.add_argument( + "--device", + type=str, + default="cpu", + help="Device to run the script on. Either 'cpu' or 'gpu'.", + ) + self.parser.add_argument( + "--files-per-partition", + type=int, + default=2, + help="Number of jsonl files to combine into single partition", + ) + self.parser.add_argument( + "--n-workers", + type=int, + default=os.cpu_count(), + help="The number of workers to run in total on the Dask CPU cluster", + ) + self.parser.add_argument( + "--num-files", + type=int, + default=None, + help="Upper limit on the number of json files to process", + ) + self.parser.add_argument( + "--nvlink-only", + action="store_true", + help="Start a local cluster with only NVLink enabled." + "Only applicable when protocol=ucx and no scheduler file/address is specified", + ) + self.parser.add_argument( + "--protocol", + type=str, + default="tcp", + help="Protcol to use for dask cluster" + "Note: This only applies to the localCUDACluster. If providing an user created " + "cluster refer to" + "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-protocol", # noqa: E501 + ) + self.parser.add_argument( + "--rmm-pool-size", + type=str, + default=None, + help="Initial pool size to use for the RMM Pool Memory allocator" + "Note: This only applies to the LocalCUDACluster. If providing an user created " + "cluster refer to" + "https://docs.rapids.ai/api/dask-cuda/stable/api.html#cmdoption-dask-cuda-rmm-pool-size", # noqa: E501 + ) + self.parser.add_argument( + "--scheduler-address", + type=str, + default=None, + help="Address to the scheduler of a created dask cluster. If not provided" + "a single node Cluster will be started.", + ) + self.parser.add_argument( + "--scheduler-file", + type=str, + default=None, + help="Path to the scheduler file of a created dask cluster. If not provided" + " a single node Cluster will be started.", + ) + self.parser.add_argument( + "--threads-per-worker", + type=int, + default=1, + help="The number of threads ot launch per worker on the Dask CPU cluster. Usually best set at 1 due to the GIL.", + ) + + return self.parser + + @staticmethod + def parse_client_args(args: argparse.Namespace): + """ + Extracts relevant arguments from an argparse namespace to pass to get_client + """ + relevant_args = [ + "scheduler_address", + "scheduler_file", + "n_workers", + "threads_per_worker", + "nvlink_only", + "protocol", + "rmm_pool_size", + "enable_spilling", + "set_torch_to_use_rmm", + ] + dict_args = vars(args) + + parsed_args = {arg: dict_args[arg] for arg in relevant_args if arg in dict_args} + if "device" in dict_args: + parsed_args["cluster_type"] = dict_args["device"] + + return parsed_args + + @staticmethod + def parse_distributed_classifier_args( + description="Default distributed classifier argument parser", + ) -> argparse.ArgumentParser: + """ + Adds default set of arguments that are common to multiple stages + of the pipeline + """ + parser = argparse.ArgumentParser( + description, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser = ArgumentHelper(parser).add_distributed_args() + # Set low default RMM pool size for classifier + # to allow pytorch to grow its memory usage + # by default + parser.set_defaults(rmm_pool_size="512MB") + parser.add_argument( + "--input-data-dir", + type=str, + help="The path of the input files", + required=True, + ) + parser.add_argument( + "--output-data-dir", + type=str, + help="The path of the output files", + required=True, + ) + parser.add_argument( + "--model-path", + type=str, + help="The path to the model file", + required=True, + ) + parser.add_argument( + "--input-file-type", + type=str, + help="The type of the input files", + required=True, + ) + parser.add_argument( + "--output-file-type", + type=str, + default="jsonl", + help="The type of the output files", + ) + parser.add_argument( + "--batch-size", + type=int, + default=128, + help="The batch size to be used for inference", + ) + ArgumentHelper.attach_bool_arg( + parser, "autocast", default=True, help="Whether to use autocast or not" + ) + ArgumentHelper.attach_bool_arg( + parser, + "enable-spilling", + default=True, + help="Whether to enable spilling or not", + ) + + # Setting to False makes it more stable for long running jobs + # possibly because of memory fragmentation + ArgumentHelper.attach_bool_arg( + parser, + "set-torch-to-use-rmm", + default=False, + help="Whether to set torch to use RMM or not", + ) + + return parser + + @staticmethod + def parse_gpu_dedup_args(description: str) -> argparse.ArgumentParser: + """ + Adds default set of arguments that are common to multiple stages + of the pipeline + """ + + parser = argparse.ArgumentParser( + description, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + argumentHelper = ArgumentHelper(parser) + + argumentHelper.add_distributed_args() + + # Set default device to GPU for dedup + argumentHelper.parser.set_defaults(device="gpu") + argumentHelper.parser.add_argument( + "--input-data-dirs", + type=str, + nargs="+", + default=None, + help="Input directories consisting of .jsonl files that are accessible " + "to all nodes. This path must be accessible by all machines in the cluster", + ) + argumentHelper.parser.add_argument( + "--input-json-text-field", + type=str, + default="text", + help="The name of the field within each json object of the jsonl " + "file that contains the text from which minhashes will be computed. ", + ) + argumentHelper.parser.add_argument( + "--input-json-id-field", + type=str, + default="adlr_id", + help="The name of the field within each json object of the jsonl " + "file that assigns a unqiue ID to each document. " + "Can be created by running the script " + "'./prospector/add_id.py' which adds the field 'adlr_id' " + "to the documents in a distributed fashion", + ) + argumentHelper.parser.add_argument( + "--log-dir", + type=str, + default="./logs/", + help="The output log directory where node and local", + ) + argumentHelper.parser.add_argument( + "--profile-path", + type=str, + default=None, + help="Path to save dask profile", + ) + + return argumentHelper.parser diff --git a/tutorials/peft-curation/main.py b/tutorials/peft-curation/main.py index b24b6f2a..fe7b354f 100644 --- a/tutorials/peft-curation/main.py +++ b/tutorials/peft-curation/main.py @@ -28,7 +28,7 @@ from nemo_curator.modifiers.unicode_reformatter import UnicodeReformatter from nemo_curator.modules.modify import Modify from nemo_curator.utils.distributed_utils import get_client -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper SCRIPT_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data") @@ -98,7 +98,7 @@ def run_curation_pipeline(args: Any, jsonl_fp: str) -> str: Returns: str: The path to the curated JSONL file. """ - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) print(f" Running the curation pipeline on '{jsonl_fp}'...") orig_dataset = DocumentDataset.read_json(jsonl_fp, add_filename=True) dataset = orig_dataset @@ -162,8 +162,7 @@ def run_curation_pipeline(args: Any, jsonl_fp: str) -> str: def main(): parser = argparse.ArgumentParser() - parser = add_distributed_args(parser) - args = parser.parse_args() + args = ArgumentHelper(parser).add_distributed_args().parse_args() # Limit the total number of workers to ensure we don't run out of memory. args.n_workers = min(args.n_workers, 8) diff --git a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb index 0653279b..36dc4f84 100755 --- a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb +++ b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb @@ -121,7 +121,7 @@ "import argparse\n", "\n", "from nemo_curator.utils.distributed_utils import get_client,get_num_workers\n", - "from nemo_curator.utils.script_utils import add_distributed_args\n", + "from nemo_curator.utils.script_utils import ArgumentHelper\n", "from nemo_curator.utils.file_utils import get_all_files_paths_under, separate_by_metadata\n", "from nemo_curator.utils.distributed_utils import read_data,write_to_disk\n", "from nemo_curator.datasets import DocumentDataset\n", @@ -151,7 +151,7 @@ " import cudf \n", "\n", "def attach_args(parser=argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)):\n", - " return add_distributed_args(parser)\n", + " return ArgumentHelper(parser).add_distributed_args()\n", "\n", "def check_jsonl_file(file_dir):\n", " for file in os.listdir(file_dir):\n", diff --git a/tutorials/tinystories/main.py b/tutorials/tinystories/main.py index 0d81c1ff..d4dff1ee 100644 --- a/tutorials/tinystories/main.py +++ b/tutorials/tinystories/main.py @@ -31,7 +31,7 @@ from nemo_curator.modules.modify import Modify from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.utils.script_utils import add_distributed_args, parse_client_args +from nemo_curator.utils.script_utils import ArgumentHelper SCRIPT_DIR_PATH = os.path.dirname(os.path.abspath(__file__)) DATA_DIR = os.path.join(SCRIPT_DIR_PATH, "data") @@ -173,7 +173,7 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: jsonl_dir (str): Directory path where the JSONL files are stored. """ # Initialize the Dask cluster. - client = get_client(**parse_client_args(args)) + client = get_client(**ArgumentHelper.parse_client_args(args)) print(f"Running curation pipeline on '{jsonl_dir}'...") files = [ fp @@ -181,9 +181,7 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: if fp.endswith(".jsonl") ] print("Reading the data...") - orig_dataset = DocumentDataset.read_json( - files, add_filename=True, input_meta=args.input_meta - ) + orig_dataset = DocumentDataset.read_json(files, add_filename=True) dataset = orig_dataset curation_steps = Sequential( @@ -215,15 +213,7 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: def main(): parser = argparse.ArgumentParser() - parser = add_distributed_args(parser) - parser.add_argument( - "--input-meta", - type=str, - default=None, - help="A string formatted as a dictionary, which outlines the field names and " - "their respective data types within the JSONL input files.", - ) - args = parser.parse_args() + args = ArgumentHelper(parser).add_distributed_args().parse_args() # Limit the total number of workers to ensure we don't run out of memory. args.n_workers = min(args.n_workers, 8)