From 1326299eb4592116db414dcd9d217ca1f79ec42f Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Fri, 11 Oct 2024 14:24:45 -0700 Subject: [PATCH 1/3] Update requirements_cuda12x.txt Signed-off-by: Sarah Yurick --- requirements/requirements_cuda12x.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/requirements/requirements_cuda12x.txt b/requirements/requirements_cuda12x.txt index 99e2ede7..99237bf7 100644 --- a/requirements/requirements_cuda12x.txt +++ b/requirements/requirements_cuda12x.txt @@ -1,6 +1,6 @@ -cudf-cu12>=24.8 -cugraph-cu12>=24.8 -cuml-cu12>=24.8 -dask-cuda>=24.8 -dask-cudf-cu12>=24.8 +cudf-cu12>=24.10 +cugraph-cu12>=24.10 +cuml-cu12>=24.10 +dask-cuda>=24.10 +dask-cudf-cu12>=24.10 spacy[cuda12x]>=3.6.0, <3.8.0 From f6e72ba5e1b4b462c84e0a43f50540f5d8ce9e1a Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Mon, 14 Oct 2024 15:11:17 -0700 Subject: [PATCH 2/3] Update deduplication docs (#258) * initial exact dedup doc updates Signed-off-by: Ayush Dattagupta * More fuzzy dedup doc updates Signed-off-by: Ayush Dattagupta * more updates Signed-off-by: Ayush Dattagupta * Add semdedup to GPU modules Signed-off-by: Ayush Dattagupta * Apply suggestions from code review Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Ayush Dattagupta * Address reviews Signed-off-by: Ayush Dattagupta * Apply suggestions from code review Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Ayush Dattagupta * address more review comments Signed-off-by: Ayush Dattagupta * Apply suggestions from code review Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Ayush Dattagupta * Fix position of message for exact dedup api Signed-off-by: Ayush Dattagupta * Add id field param to cli scripts Signed-off-by: Ayush Dattagupta --------- Signed-off-by: Ayush Dattagupta Co-authored-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Signed-off-by: Sarah Yurick --- docs/user-guide/cpuvsgpu.rst | 1 + docs/user-guide/gpudeduplication.rst | 476 ++++++++++++++++++++------- 2 files changed, 361 insertions(+), 116 deletions(-) diff --git a/docs/user-guide/cpuvsgpu.rst b/docs/user-guide/cpuvsgpu.rst index 80e40291..683723b2 100644 --- a/docs/user-guide/cpuvsgpu.rst +++ b/docs/user-guide/cpuvsgpu.rst @@ -64,6 +64,7 @@ The following NeMo Curator modules are GPU based. * Exact Deduplication * Fuzzy Deduplication +* Semantic Deduplication * Distributed Data Classification * Domain Classification diff --git a/docs/user-guide/gpudeduplication.rst b/docs/user-guide/gpudeduplication.rst index 407c2319..d5f84e73 100644 --- a/docs/user-guide/gpudeduplication.rst +++ b/docs/user-guide/gpudeduplication.rst @@ -5,164 +5,409 @@ GPU Accelerated Exact and Fuzzy Deduplication ####################################################### ------------------------------------------ +========================================= Background ------------------------------------------ +========================================= + +The exact and fuzzy document-level deduplication modules in NeMo Curator aim to reduce the occurrence of duplicate and +near-duplicate documents in a dataset. Both functionalities are supported in NeMo Curator and accelerated using `RAPIDS `_. -Training on randomly selected documents for many epochs can be sub-optimal to downstream performance for language models. +The main motivation for this is that training on randomly selected documents for many epochs can be sub-optimal to downstream performance for language models. For more information on when this is harmful, please see `Muennighoff et al., 2023 `_ and `Tirumala et al., 2023 `_. -The exact and fuzzy document-level deduplication module in the NeMo Curator aims at reducing the occurence of duplicate and -near-duplicate documents in the dataset. Exact deduplication refers to removing identical (i.e., document strings are equal) -documents from the dataset, while fuzzy deduplication refers to removing near-identical (e.g., an excerpt of a document is used in another document) -documents from the dataset. -Both functionalities are supported in NeMo Curator and accelerated using `RAPIDS `_. +========================================= +Exact Deduplication +========================================= + +Exact deduplication refers to removing identical documents (i.e., document strings that are equal) from the dataset. + +As exact deduplication requires significantly less compute, we typically will run exact deduplication before fuzzy deduplication. +Also, from our experience in deduplicating Common Crawl snapshots, a significant portion (as high as ~40%) of the duplicates can be exact duplicates. + +----------------------------------------- +How It Works +----------------------------------------- + Exact dedpulication works by hashing each document and only keeping one document per hash. -Fuzzy deduplication is more involved and follows the method outlined in `Microsoft Turing NLG 530B `_. +Running exact deduplication works on both CPU- and GPU-based backends. ----------------------------------------- Usage ----------------------------------------- -As exact deduplication is a much less involved procedure and requires significantly less compute, -we typically will first run exact deduplication before fuzzy deduplication. Also, from our experience in -deduplicating Common Crawl snapshots, a significant portion of the duplicates are in fact exact duplicates. -When removing near-duplicates within the corpus we perform fuzzy deduplication at the document level in order to remove documents that -have high Jaccard similarity. Our approach closely resembles the approach described in `Smith et al., 2020 `_. This -approach can essentially be split into two conceptual changes. The first stage involves computing MinHashes Signatures on -documents and then performing Locality Sensitive Hashing (LSH) to find candidate duplucates. Due to the approximate nature of the bucketing via MinHash + LSH -(`Leskovec et al., 2020 `_) we process each of the buckets to remove any potential false positives that may have been hashed into the buckets. +.. _exactdup_pyapi: +"""""""""""" +Python API +"""""""""""" +.. note:: + Before running exact deduplication, you need to ensure that the dataset contains a unique ID for each document. + If needed, you can use the :code:`add_id` module within NeMo Curator to accomplish this. -Before running either of these modules, users should assign a unique document ID to each document in the corpus. -This can be accomplished using the :code:`add_id` module within the NeMo Curator: + .. code-block:: python -.. code-block:: bash + from nemo_curator import AddId + from nemo_curator.datasets import DocumentDataset + + add_id = AddId(id_field="my_id", id_prefix="doc_prefix") + dataset = DocumentDataset.read_json("input_file_path") + id_dataset = add_id(dataset) + id_dataset.to_parquet("/path/to/parquet/data") + +After ensuring your dataset has a unique ID field (or creating one with the code above), you can perform exact deduplication as follows: - add_id \ - --id-field-name="my_id" \ - --input-data-dir= \ - --log-dir=./log/add_id +.. code-block:: python -This will create a new field named :code:`my_id` within each json document which will have the form "doc_prefix-000001". -If the dataset already has a unique ID this step can be skipped. + from nemo_curator import ExactDuplicates + from nemo_curator.datasets import DocumentDataset -**Note**: Fuzzy deduplication only works with numeric ID's or the specific ID format generated by the :code:`add_id` script. If the -dataset does not contain ID's in this format it's recommended to convert to an integer based ID or ID created by the :code:`add_id` script. + # Initialize the deduplication object + ExactDups = ExactDuplicates(id_field="my_id", text_field="text") -Once a unique ID has been added to each document, users can proceed with exact and fuzzy deduplication which roughly require the following -steps (all scripts are included in the :code:`nemo_curator/scripts/` subdirectory): + dataset = DocumentDataset.read_parquet( + input_files="/path/to/parquet/data", + backend="cudf", # or "pandas" for CPU + ) -* Exact dedup + duplicate_docs = ExactDups(dataset) + + """ + Sample output: + my_id _hashes + 22 doc_prefix-37820 e7cb1e88a7a30ea101d33e0c4c8857ef + 70 doc_prefix-56261 bfce4501b9caa93cb3daccd6db1f13af + 75 doc_prefix-56271 bfce4501b9caa93cb3daccd6db1f13af + 84 doc_prefix-52261 0f763a2937d57b9d96bf9f220e55f2bd + 107 doc_prefix-52271 0f763a2937d57b9d96bf9f220e55f2bd + """ + +.. tip:: + A more comprehensive example, including how to remove documents from a corpus using the list of + duplicate IDs generated from the exact deduplication step above, can be found in `examples/exact_deduplication.py `_. + +"""""""""""" +CLI Utility +"""""""""""" +Assuming that a unique ID has been added to each document, users can proceed with finding exact duplicates +as follows: + +* Find Exact Duplicates 1. Input: Data directories - 2. Output: _exact_duplicates.parquet. List of exact duplicates and the document hash. + 2. Output: ``_exact_duplicates.parquet``. List of exact duplicates and the document hash. -* Fuzzy Dedup +.. code-block:: bash - 1. Compute Minhashes - - Input: Data Directories - - Output: minhashes.parquet for each data dir. - - Example call: + # same as `python nemo_curator/scripts/find_exact_duplicates.py` + gpu_exact_dups \ + --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ + --output-dir /path/to/output_dir \ + --input-json-text-field text_column_name \ + --input-json-id-field id_column_name \ + --log-dir ./ + # --scheduler-file /path/to/file.json - .. code-block:: bash +All CLI scripts are included in the :code:`nemo_curator/scripts/` subdirectory. - # same as `python compute_minhashes.py` - gpu_compute_minhashes \ - --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ - --output-minhash-dir /path/to/output_minhashes \ - --input-json-text-field text_column_name \ - --input-json-id-field id_column_name \ - --minhash-length number_of_hashes \ - --char-ngram char_ngram_size \ - --hash-bytes 4(or 8 byte hashes) \ - --seed 42 \ - --log-dir ./ - # --scheduler-file /path/to/file.json +.. caution:: + The CLI utilities are limited to JSONL datasets and only work with GPU-based backends. + For different dataset formats or backends use the :ref:`exactdup_pyapi`. +========================================= +Fuzzy Deduplication +========================================= - 2. Buckets (Minhash Buckets) - - Input: Minhash directories - - Output: Buckets.parquet - - Example call: +When removing near-duplicates within the corpus, we perform fuzzy deduplication at the document level in order to remove documents with +high Jaccard similarity scores. Our approach closely resembles the approach described in `Smith et al., 2020 `_. - .. code-block:: bash +----------------------------------------- +How It Works +----------------------------------------- - # same as `python minhash_lsh.py` - minhash_buckets \ - --input-data-dirs /path/to/output_minhashes/dir1 /path/to/output_minhashes/dir2 \ - --output-bucket-dir /path/to/dedup_output \ - --input-minhash-field _minhash_signature \ - --input-json-id-field id_column_name \ - --minhash-length number_of_hashes \ - --num-bands num_bands \ - --buckets-per-shuffle 1 `#Value b/w [1-num_bands]. Higher is better but might lead to oom` \ - --log-dir ./ - # --scheduler-file /path/to/file.json +This approach can essentially be split into the following stages: + +1. **Compute Minhashes**: The first stage involves computing `MinHash `_ Signatures on documents. + NeMo Curator currently only supports character-based n-grams for MinHashing. An approximate metric of ~4.5 characters per word can be used to determine the n-gram size for users familiar with word-based ngrams. +2. **LSH** *(Locality Sensitive Hashing)*: Perform `LSH `_ + to find candidate duplicates. + +3. **Buckets to Edgelist**: If not using the false positive check, we directly convert the LSH buckets to edges for the connected components computation. + +3. **False Positive Check** *(optional alternative to Buckets to Edgelist)*: Due to the approximate nature of the bucketing via MinHash + LSH + (`Leskovec et al., 2020 `_), NeMo Curator provides the option to further + process each of the buckets by computing some pairwise Jaccard similarity scores between documents in each bucket and filter out false positives that might have been hashed into the same bucket. + + a. **Jaccard Map Buckets:** Since buckets generated by LSH can have high cardinality, we map multiple LSH buckets to larger batches for + efficient processing. Aditionally we assign a few documents (controlled via :code:`num_anchor_docs`) for each bucket to be candidate documents + for pairwise Jaccard similarity computations within that bucket. + b. **Jaccard Shuffle**: Store documents from the original dataset into new directories and files such that all documents in the same batch (bucket) + are stored together. This allows parallelizing pairwise Jaccard similarity computations across different buckets. + c. **Jaccard Compute**: Compute Jaccard similarity scores between all pairs of documents in each bucket to the candidate anchor docs. - 3. Jaccard Map Buckets - - Input: Buckets.parquet + Data Dir - - Output: anchor_docs_with_bk.parquet +4. **Connected Components**: Due to the approximate nature of LSH, documents that are near duplicates may be assigned into different buckets with a few overlapping documents + between these buckets. We use a GPU accelerated connected components algorithm to find all connected components in the graph formed by the edges between documents in the same bucket. + +The result from the connected components step is a list of document IDs and the group they belong to. +All documents in the same group are considered near duplicates. +These results can be used to remove the near duplicates from the corpus. + +----------------------------------------- +Usage +----------------------------------------- + +.. _fuzzydup_pyapi: + +"""""""""""" +Python API +"""""""""""" + +.. note:: + Before running fuzzy deduplication, you need to ensure that the dataset contains a unique ID for each document. + If needed, you can use the ``add_id`` module within NeMo Curator to accomplish this. + + .. code-block:: python + + from nemo_curator import AddId + from nemo_curator.datasets import DocumentDataset + + add_id = AddId(id_field="my_id", id_prefix="doc_prefix") + dataset = DocumentDataset.read_json("input_file_path") + id_dataset = add_id(dataset) + id_dataset.to_json("/path/to/jsonl/data") + +1. Configuration + + a. Using the API Directlty + + .. code-block:: python + + from nemo_curator import FuzzyDuplicatesConfig + + config = FuzzyDuplicatesConfig( + cache_dir="/path/to/dedup_outputs", + id_field="my_id", + text_field="text", + seed=42, + char_ngrams=24, + num_buckets=20, + hashes_per_bucket=13, + use_64_bit_hash=False, + buckets_per_shuffle=2, + false_positive_check=False, + ) + + b. Using a YAML file + + .. code-block:: yaml + + cache_dir: /path/to/dedup_outputs + id_field: my_id + text_field: text + seed: 42 + char_ngrams: 24 + num_buckets: 20 + hashes_per_bucket: 13 + use_64_bit_hash: False + buckets_per_shuffle: 2 + false_positive_check: False + + .. code-block:: python + + from nemo_curator import FuzzyDuplicatesConfig + + config = FuzzyDuplicatesConfig.from_yaml("/path/to/config.yaml") + + +2. Usage Post Configuration + +.. code-block:: python + + from nemo_curator import FuzzyDuplicates + from nemo_curator.datasets import DocumentDataset + + # Initialize the deduplication object + FuzzyDups = FuzzyDuplicates(config=config, logger="./") + + dataset = DocumentDataset.read_json( + input_files="/path/to/jsonl/data", + backend="cudf", # FuzzyDuplicates only supports datasets with the cuDF backend. + ) + + duplicate_docs = FuzzyDups(dataset) + """ + Sample output: + my_id group + 0 doc_prefix-56151 32 + 1 doc_prefix-47071 590 + 2 doc_prefix-06840 305 + 3 doc_prefix-20910 305 + 4 doc_prefix-42050 154 + """ + +.. tip:: + + - A more comprehensive example for the above, including how to remove documents from a corpus using the list of + duplicate IDs generated from fuzzy deduplication, can be found in `examples/fuzzy_deduplication.py `_. + - The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above. + - Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors. + - Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance. + - Clear the ``cache_dir`` between runs to avoid data from previous runs interfering with the current run's results. + +"""""""""""" +CLI Utility +"""""""""""" + +.. caution:: + Fuzzy deduplication CLI scripts only work with the specific ID format generated by the :code:`add_id` script. If the + dataset does not contain IDs in this format, it is recommended to create them with the :code:`add_id` script as follows: + + .. code-block:: bash + + add_id \ + --id-field-name="my_id" \ + --input-data-dir= \ + --id-prefix="doc_prefix" \ + --log-dir=./log/add_id + + This will create a new field named :code:`my_id` within each JSON document which will have the form "doc_prefix-000001". + If the dataset already has a unique ID this step can be skipped. + +Once a unique ID has been added to each document, users can proceed with fuzzy deduplication, which roughly require the following +steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication `_ subdirectory): + +1. Compute Minhashes + - Input: Data directories + - Output: ``minhashes.parquet`` for each data directory + - Example call: + + .. code-block:: bash + + # same as `python compute_minhashes.py` + gpu_compute_minhashes \ + --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ + --output-minhash-dir /path/to/output_minhashes \ + --input-json-text-field text_column_name \ + --input-json-id-field id_column_name \ + --minhash-length number_of_hashes \ + --char-ngram char_ngram_size \ + --hash-bytes 4 `#or 8 byte hashes` \ + --seed 42 \ + --log-dir ./ + # --scheduler-file /path/to/file.json + +.. _fuzzydup_lsh: + +2. Buckets (Minhash Buckets) + - Input: Minhash directories + - Output: ``_buckets.parquet`` + - Example call: + + .. code-block:: bash + + # same as `python minhash_lsh.py` + minhash_buckets \ + --input-data-dirs /path/to/output_minhashes/dir1 /path/to/output_minhashes/dir2 \ + --output-bucket-dir /path/to/dedup_output \ + --input-minhash-field _minhash_signature \ + --input-json-id-field id_column_name \ + --minhash-length number_of_hashes \ + --num-bands num_bands \ + --buckets-per-shuffle 1 `#Value between [1-num_bands]. Higher is better but might lead to OOM` \ + --log-dir ./ + # --scheduler-file /path/to/file.json + +3. False Positive Check (optional): If skipping this step, proceed to the :ref:`skip fp check section `. + + a. Jaccard Map Buckets + - Input: ``_buckets.parquet`` and data directories + - Output: ``anchor_docs_with_bk.parquet`` - Example call: - .. code-block:: bash + .. code-block:: bash - # same as `python map_buckets.py` - jaccard_map_buckets \ - --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ - --input-bucket-dir /path/to/dedup_output/_buckets.parquet \ - --output-dir /path/to/dedup_output \ - --input-json-text-field text_column_name \ - --input-json-id-field id_column_name \ - # --scheduler-file /path/to/file.json + # same as `python map_buckets.py` + jaccard_map_buckets \ + --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ + --input-bucket-dir /path/to/dedup_output/_buckets.parquet \ + --output-dir /path/to/dedup_output \ + --input-json-text-field text_column_name \ + --input-json-id-field id_column_name + # --scheduler-file /path/to/file.json - 4. Jaccard Shuffle - - Input: anchor_docs_with_bk.parquet + Data Dir - - Output: shuffled_docs.parquet + b. Jaccard Shuffle + - Input: ``anchor_docs_with_bk.parquet`` and data directories + - Output: ``shuffled_docs.parquet`` - Example call: - .. code-block:: bash + .. code-block:: bash - # same as `python jaccard_shuffle.py` - jaccard_shuffle \ - --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ - --input-bucket-mapping-dir /path/to/dedup_output/anchor_docs_with_bk.parquet \ - --output-dir /path/to/dedup_output \ - --input-json-text-field text_column_name \ - --input-json-id-field id_column_name \ - # --scheduler-file /path/to/file.json + # same as `python jaccard_shuffle.py` + jaccard_shuffle \ + --input-data-dirs /path/to/jsonl/dir1 /path/to/jsonl/dir2 \ + --input-bucket-mapping-dir /path/to/dedup_output/anchor_docs_with_bk.parquet \ + --output-dir /path/to/dedup_output \ + --input-json-text-field text_column_name \ + --input-json-id-field id_column_name + # --scheduler-file /path/to/file.json - 5. Jaccard compute - - Input: Shuffled docs.parquet - - Output: jaccard_similarity_results.parquet + c. Jaccard Compute + - Input: ``shuffled_docs.parquet`` + - Output: ``jaccard_similarity_results.parquet`` - Example call: - .. code-block:: bash + .. code-block:: bash - # same as `python jaccard_compute.py` - jaccard_compute \ - --shuffled-docs-path /path/to/dedup_output/shuffled_docs.parquet \ - --output-dir /path/to/dedup_output \ - --ngram-size char_ngram_size_for_similarity \ - # --scheduler-file /path/to/file.json + # same as `python jaccard_compute.py` + jaccard_compute \ + --shuffled-docs-path /path/to/dedup_output/shuffled_docs.parquet \ + --output-dir /path/to/dedup_output \ + --ngram-size char_ngram_size_for_similarity \ + --input-json-id-field id_column_name + # --scheduler-file /path/to/file.json + +.. _fuzzydup_nofp: - 6. Connected Components - - Input: jaccard_similarity_results.parquet - - Output: connected_components.parquet +3. Skipping the false positive check (more performant). This step is not needed if the false positive check was performed. + + a. Buckets to Edgelist + - Input: ``_buckets.parquet`` + - Output: ``_edges.parquet`` - Example call: - .. code-block:: bash + .. code-block:: bash - # same as `python connected_components.py` - gpu_connected_component \ - --jaccard-pairs-path /path/to/dedup_output/jaccard_similarity_results.parquet \ - --output-dir /path/to/dedup_output \ - --cache-dir /path/to/cc_cache \ - --jaccard-threshold 0.8 - # --scheduler-file /path/to/file.json + python buckets_to_edges.py \ + --input-bucket-dir /path/to/dedup_output/_buckets.parquet \ + --output-dir /path/to/dedup_output \ + --input-json-id-field id_column_name + # --scheduler-file /path/to/file.json + +4. Connected Components + - Input: ``jaccard_similarity_results.parquet`` (if you ran the false positive check) or ``_edges.parquet`` (if you skipped the false positive check) + - Output: ``connected_components.parquet`` + - Example call: + + .. code-block:: bash + + # same as `python connected_components.py` + gpu_connected_component \ + --jaccard-pairs-path /path/to/dedup_output/jaccard_similarity_results.parquet `#Or /path/to/dedup_output/_edges.parquet` \ + --output-dir /path/to/dedup_output \ + --cache-dir /path/to/cc_cache \ + --jaccard-threshold 0.8 \ + --input-json-id-field id_column_name + # --scheduler-file /path/to/file.json + +.. caution:: + The CLI utilities are limited to JSONL datasets and only work with specific ID formats. + For different dataset or ID formats, use the :ref:`fuzzydup_pyapi`. + +------------------------ +Incremental Fuzzy Deduplication +------------------------ -* Incremental Fuzzy Dedup - To incrementally perform fuzzy dedup, organize your incremental dataset snapshots into separate directories and pass a list of all your directories to :code:`gpu_compute_minhashes`. All other subsequent steps can be done as described above without modification. +* If any new data is added to the corpus, you will need to perform deduplication incrementally. To incrementally perform fuzzy deduplication, we do not need to recompute minhashes for datasets where minhashes were already computed. + Instead, you can organize your incremental datasets into separate directories and pass a list of all new directories to :code:`gpu_compute_minhashes`. - Input (assuming incremental snapshots are all under :code:`/input/`): @@ -195,6 +440,5 @@ steps (all scripts are included in the :code:`nemo_curator/scripts/` subdirector --log-dir ./ # --scheduler-file /path/to/file.json -In addition to the scripts, there are examples in the `examples` directory that showcase using the python module -directly in your own code. It also has examples on how to remove documents from the corpus using the list of duplicate IDs generated from exact or fuzzy -deduplication. +All subsequent steps, starting with :ref:`Buckets `, can be executed on all the data +(old and new) as described above without modification. \ No newline at end of file From dcb8a2f76aff75cd65926fa18251932fa71ad14c Mon Sep 17 00:00:00 2001 From: yury-tokpanov Date: Tue, 15 Oct 2024 10:30:43 -0700 Subject: [PATCH 3/3] Tutorial for reproducing Zyda2 dataset (#303) * Zyda2 tutorial Signed-off-by: Yury Tokpanov * Fix linter errors Signed-off-by: Yury Tokpanov --------- Signed-off-by: Yury Tokpanov Signed-off-by: Sarah Yurick --- tutorials/README.md | 1 + .../zyda2-tutorial/0_processing/helper.py | 21 ++ .../0_processing/process_dclm.py | 51 +++++ .../0_processing/process_dolma_cc.py | 29 +++ .../0_processing/process_fwe2.py | 82 ++++++++ .../0_processing/process_zyda.py | 45 +++++ .../zyda2-tutorial/1_fuzzy_dedup/0_minhash.py | 67 +++++++ .../zyda2-tutorial/1_fuzzy_dedup/1_lsh.py | 69 +++++++ .../1_fuzzy_dedup/2_buckets_to_edges.py | 48 +++++ .../1_fuzzy_dedup/3_connected_components.py | 49 +++++ .../2_dupes_removal/0_id_mapping.py | 78 ++++++++ .../2_dupes_removal/1_id_conversion.py | 65 ++++++ .../2_dupes_removal/2_compute_counts.py | 96 +++++++++ .../2_dupes_removal/3_prep_dupes.py | 186 ++++++++++++++++++ .../2_dupes_removal/4_get_dupes_dclm.py | 154 +++++++++++++++ .../2_dupes_removal/4_get_dupes_dolma-cc.py | 125 ++++++++++++ .../2_dupes_removal/5_get_dupes_zyda.py | 132 +++++++++++++ .../2_dupes_removal/remove_dupes.py | 70 +++++++ .../2_dupes_removal/run_remove_dupes_dclm.sh | 127 ++++++++++++ .../2_dupes_removal/run_remove_dupes_dolma.sh | 15 ++ .../2_dupes_removal/run_remove_dupes_zyda.sh | 78 ++++++++ .../3_quality_model/run_quality_classifier.py | 49 +++++ .../3_quality_model/run_quality_classifier.sh | 30 +++ .../zyda2-tutorial/4_filtering/filter_fwe.py | 34 ++++ .../4_filtering/filter_quality.py | 57 ++++++ .../4_filtering/run_filter_dolma.sh | 14 ++ .../4_filtering/run_filter_zyda.sh | 77 ++++++++ tutorials/zyda2-tutorial/README.md | 152 ++++++++++++++ 28 files changed, 2001 insertions(+) create mode 100644 tutorials/zyda2-tutorial/0_processing/helper.py create mode 100644 tutorials/zyda2-tutorial/0_processing/process_dclm.py create mode 100644 tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py create mode 100644 tutorials/zyda2-tutorial/0_processing/process_fwe2.py create mode 100644 tutorials/zyda2-tutorial/0_processing/process_zyda.py create mode 100644 tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py create mode 100644 tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py create mode 100644 tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py create mode 100644 tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dolma.sh create mode 100644 tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_zyda.sh create mode 100644 tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py create mode 100644 tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh create mode 100644 tutorials/zyda2-tutorial/4_filtering/filter_fwe.py create mode 100644 tutorials/zyda2-tutorial/4_filtering/filter_quality.py create mode 100644 tutorials/zyda2-tutorial/4_filtering/run_filter_dolma.sh create mode 100644 tutorials/zyda2-tutorial/4_filtering/run_filter_zyda.sh create mode 100644 tutorials/zyda2-tutorial/README.md diff --git a/tutorials/README.md b/tutorials/README.md index e63ab5e2..84004710 100644 --- a/tutorials/README.md +++ b/tutorials/README.md @@ -27,4 +27,5 @@ To get started, we recommend starting with the following tutorials to become fam | [synthetic-preference-data](./synthetic-preference-data) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) for generating synthetic preference data | | [synthetic-retrieval-evaluation](./synthetic-retrieval-evaluation) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) for generating synthetic data to evaluate retrieval pipelines | | [tinystories](./tinystories) | A comprehensive example of curating a small dataset to use for model pre-training. | [Blog post](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-training-with-nvidia-nemo-curator/) +| [zyda2-tutorial](./zyda2-tutorial) | A comprehensive tutorial on how to reproduce Zyda2 dataset. | diff --git a/tutorials/zyda2-tutorial/0_processing/helper.py b/tutorials/zyda2-tutorial/0_processing/helper.py new file mode 100644 index 00000000..21b14805 --- /dev/null +++ b/tutorials/zyda2-tutorial/0_processing/helper.py @@ -0,0 +1,21 @@ +import os + +from nemo_curator import AddId +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.file_utils import get_all_files_paths_under + + +def ensure_directory_exists(filename: str): + os.makedirs(os.path.dirname(filename), exist_ok=True) + + +def process_data(input_folder, output_folder, prefix, partition_size="512MB"): + raw_files = get_all_files_paths_under(input_folder) + raw_data = DocumentDataset.read_parquet(raw_files) + raw_data_rep = DocumentDataset( + raw_data.df.repartition(partition_size=partition_size) + ) + add_id = AddId(id_field="nemo_id", id_prefix=prefix) + data_with_id = add_id(raw_data_rep) + ensure_directory_exists(output_folder) + data_with_id.to_parquet(output_folder) diff --git a/tutorials/zyda2-tutorial/0_processing/process_dclm.py b/tutorials/zyda2-tutorial/0_processing/process_dclm.py new file mode 100644 index 00000000..bcfe6e06 --- /dev/null +++ b/tutorials/zyda2-tutorial/0_processing/process_dclm.py @@ -0,0 +1,51 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +from dask.distributed import Client, LocalCluster +from helper import process_data + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +INPUT_BASE = os.path.join(DATA_BASE, "raw/dclm-baseline-1.0-parquet/filtered") +OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dclm-baseline-1.0-parquet") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + logging.info("Starting Dask cluster") + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") + client = Client(cluster) + logging.info(client) + + components = [ + "global-shard_01_of_10", + "global-shard_02_of_10", + "global-shard_03_of_10", + "global-shard_04_of_10", + "global-shard_05_of_10", + "global-shard_06_of_10", + "global-shard_07_of_10", + "global-shard_08_of_10", + "global-shard_09_of_10", + "global-shard_10_of_10", + ] + + for i, component in enumerate(components, start=1): + input_path = os.path.join(INPUT_BASE, component) + if not os.path.exists(input_path): + continue + output_path = os.path.join(OUTPUT_BASE, component) + logging.info(f"Processing {component}") + process_data( + input_folder=input_path, + output_folder=output_path, + prefix=f"dclm-gs{i}", + ) + logging.info("Done!") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py b/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py new file mode 100644 index 00000000..c4f9a287 --- /dev/null +++ b/tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py @@ -0,0 +1,29 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +from dask.distributed import Client, LocalCluster +from helper import process_data + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +INPUT_BASE = os.path.join(DATA_BASE, "raw/dolma-v1_7-cc-parquet") +OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dolma-v1_7-cc-parquet") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + logging.info("Starting Dask cluster") + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") + client = Client(cluster) + logging.info(client) + + logging.info(f"Processing Dolma-CC") + process_data(input_folder=INPUT_BASE, output_folder=OUTPUT_BASE, prefix="dolma-cc") + logging.info("Done!") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/0_processing/process_fwe2.py b/tutorials/zyda2-tutorial/0_processing/process_fwe2.py new file mode 100644 index 00000000..a425a18e --- /dev/null +++ b/tutorials/zyda2-tutorial/0_processing/process_fwe2.py @@ -0,0 +1,82 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import ctypes +import gc +import logging +from pathlib import Path + +from dask.distributed import Client, LocalCluster +from helper import process_data + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +INPUT_BASE = os.path.join(DATA_BASE, "raw/fineweb-edu-score-2/data") +OUTPUT_BASE = os.path.join(DATA_BASE, "processed/fineweb-edu-score-2") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +def trim_memory() -> int: + libc = ctypes.CDLL("libc.so.6") + return libc.malloc_trim(0) + + +def get_folder_size(folder_path): + return sum( + file.stat().st_size for file in Path(folder_path).rglob("*") if file.is_file() + ) + + +def sort_folders_by_size(parent_directory): + folders = [ + f + for f in os.listdir(parent_directory) + if os.path.isdir(os.path.join(parent_directory, f)) + ] + folder_sizes = [ + (folder, get_folder_size(os.path.join(parent_directory, folder))) + for folder in folders + ] + return sorted(folder_sizes, key=lambda x: x[1]) + + +def bytes_to_human_readable(size_in_bytes): + suffixes = ["B", "KB", "MB", "GB", "TB", "PB"] + suffix_index = 0 + size = float(size_in_bytes) + while size >= 1024 and suffix_index < len(suffixes) - 1: + size /= 1024.0 + suffix_index += 1 + return f"{size:.2f} {suffixes[suffix_index]}" + + +if __name__ == "__main__": + logging.info("Starting Dask cluster") + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="240GB") + client = Client(cluster) + logging.info(client) + + components_with_sizes = sort_folders_by_size(INPUT_BASE) + + for component, component_size in components_with_sizes: + input_path = os.path.join(INPUT_BASE, component) + if not os.path.exists(input_path) or not os.path.isdir(input_path): + continue + output_path = os.path.join(OUTPUT_BASE, component) + logging.info( + f"Processing {component}, size = {bytes_to_human_readable(component_size)}" + ) + process_data( + input_folder=input_path, + output_folder=output_path, + prefix=f"fwe2-{component}", + ) + logging.info("Trimming memory") + gc.collect() + client.run(trim_memory) + logging.info("Done!") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/0_processing/process_zyda.py b/tutorials/zyda2-tutorial/0_processing/process_zyda.py new file mode 100644 index 00000000..6cc951cd --- /dev/null +++ b/tutorials/zyda2-tutorial/0_processing/process_zyda.py @@ -0,0 +1,45 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +from dask.distributed import Client, LocalCluster +from helper import process_data + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +INPUT_BASE = os.path.join(DATA_BASE, "raw/data/zyda_no_starcoder") +OUTPUT_BASE = os.path.join(DATA_BASE, "processed/zyda-parquet") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + logging.info("Starting Dask cluster") + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB") + client = Client(cluster) + logging.info(client) + + components = [ + "zyda_arxiv", + "zyda_peS2o", + "zyda_pile-uncopyrighted", + "zyda_slimpajama", + "zyda_c4-en", + "zyda_refinedweb", + ] + + for component in components: + input_path = os.path.join(INPUT_BASE, component) + if not os.path.exists(input_path): + continue + output_path = os.path.join(OUTPUT_BASE, component) + logging.info(f"Processing {component}") + process_data( + input_folder=input_path, output_folder=output_path, prefix=component + ) + logging.info("Done!") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py new file mode 100644 index 00000000..d3c73252 --- /dev/null +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py @@ -0,0 +1,67 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging +import time + +import dask_cudf + +from nemo_curator import MinHash +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, get_num_workers +from nemo_curator.utils.file_utils import get_all_files_paths_under + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +def read_folder(input_folder, columns=["nemo_id", "text"]): + data_paths = get_all_files_paths_under(input_folder) + data_paths = [f for f in data_paths if f.endswith(".parquet")] + data_paths.sort() + logging.info(f"Number of files being read: {len(data_paths)}") + text_ddf = dask_cudf.read_parquet( + data_paths, + columns=columns, + ) + return text_ddf + + +DATA_BASE = os.environ.get("DATA_BASE") +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") + + +if __name__ == "__main__": + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash") + minhash_output_dir = os.path.join(minhash_base_output_path, "data") + + # Relevant parameters + minhash_id_field = "nemo_id" + minhash_text_field = "text" + seed = 10 + minhash_length = 128 + char_ngram = 25 + use_64bit_hash = False + + # Reading all the data + text_ddf = read_folder( + input_folder=os.path.join(DATA_BASE, "processed"), + columns=[minhash_id_field, minhash_text_field], + ) + + # Computing minhashes + t0 = time.time() + minhasher = MinHash( + seed=seed, + num_hashes=minhash_length, + char_ngrams=char_ngram, + use_64bit_hash=use_64bit_hash, + id_field=minhash_id_field, + text_field=minhash_text_field, + cache_dir=minhash_output_dir, + ) + res = minhasher(DocumentDataset(text_ddf)).df + logging.info(f"Time taken for MinHash: {time.time()-t0:.2f}sec.") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py new file mode 100644 index 00000000..b574c1e8 --- /dev/null +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py @@ -0,0 +1,69 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging +import time + +import cudf +import dask_cudf +import numpy as np + +from nemo_curator import LSH +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, get_num_workers +from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +DATA_BASE = os.environ.get("DATA_BASE") +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") + + +if __name__ == "__main__": + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash") + minhash_output_dir = os.path.join(minhash_base_output_path, "data") + + # Input + lsh_input_data_path = minhash_output_dir + + # Output + lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh") + lsh_output_dir = os.path.join(lsh_base_output_path, "data") + + # Relevant parameters + lsh_id_field = "nemo_id" + minhash_field = "_minhash_signature" + minhash_length = 128 + num_bands = 8 + buckets_per_shuffle = 8 + + t0 = time.time() + + # Load MinHash output + logging.info("Converting ids") + df = dask_cudf.read_parquet(lsh_input_data_path, backend="cudf") + df = df.map_partitions( + convert_str_id_to_int, + id_column=lsh_id_field, + meta=cudf.DataFrame( + {minhash_field: [[1, 2, 3]], "doc_id": [1], "dataset_id": np.uint32(1)} + ), + ) + # Run LSH() + lsh = LSH( + cache_dir=lsh_output_dir, + num_hashes=minhash_length, + num_buckets=num_bands, + buckets_per_shuffle=buckets_per_shuffle, + id_fields=["dataset_id", "doc_id"], + minhash_field=minhash_field, + ) + res = lsh(DocumentDataset(df)) + + t1 = time.time() + logging.info(f"Time taken for LSH: {time.time() - t0} s") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py new file mode 100644 index 00000000..2ae7f408 --- /dev/null +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/2_buckets_to_edges.py @@ -0,0 +1,48 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging +import time + +import dask_cudf + +from nemo_curator.datasets import DocumentDataset +from nemo_curator.modules.fuzzy_dedup import BucketsToEdges +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +DATA_BASE = os.environ.get("DATA_BASE") +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") + + +if __name__ == "__main__": + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + # Input + lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh") + lsh_buckets_output_path = os.path.join( + lsh_base_output_path, "data/_buckets.parquet" + ) + + # Output + buckets_to_edges_out = os.path.join(DATA_BASE, "fuzzy/buckets_to_edges/data") + + t0 = time.time() + + ddf_bk = dask_cudf.read_parquet( + lsh_buckets_output_path, + split_row_groups=False, + ) + + buckets_to_edges = BucketsToEdges( + cache_dir=buckets_to_edges_out, + id_fields=["dataset_id", "doc_id"], + ) + + ddf_b2e = buckets_to_edges(DocumentDataset(ddf_bk)) + + logging.info(f"Time taken for Buckets to Edges: {time.time() - t0} s") diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py new file mode 100644 index 00000000..67796ec4 --- /dev/null +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py @@ -0,0 +1,49 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging +import time + +from nemo_curator.modules.fuzzy_dedup import ConnectedComponents +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +DATA_BASE = os.environ.get("DATA_BASE") +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") + + +if __name__ == "__main__": + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + # Input + buckets_to_edges_out = os.path.join( + DATA_BASE, "fuzzy/buckets_to_edges/data/_edges.parquet" + ) + + # Output + connected_component_base_output_path = os.path.join(DATA_BASE, "fuzzy/cc") + connected_component_output_path = os.path.join( + connected_component_base_output_path, "connected_components.parquet" + ) + connected_component_cache_dir = os.path.join( + connected_component_base_output_path, "cache" + ) + + # Relevant parameters + input_id_field = "id" + + t0 = time.time() + + components_stage = ConnectedComponents( + cache_dir=connected_component_cache_dir, + jaccard_pairs_path=buckets_to_edges_out, + id_column=input_id_field, + convert_str_ids=True, + ) + + # Load and run connected components + components_stage.cc_workflow(output_path=connected_component_output_path) + logging.info(f"Time taken for Connected Components: {time.time() - t0:.2f} s") diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py b/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py new file mode 100644 index 00000000..b2f95e25 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/0_id_mapping.py @@ -0,0 +1,78 @@ +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import json +import logging + +import cudf +import dask_cudf +import tqdm + +from nemo_curator.utils.distributed_utils import get_client, get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +def list_deepest_folders(path): + deepest_folders = [] + + # Walk through the directory structure + for root, dirs, files in os.walk( + path, topdown=False + ): # `topdown=False` starts from the deepest folder + if not dirs: # Keep only folders that don't have subdirectories + deepest_folders.append(root) + + return deepest_folders + + +def read_data_subset(dir_paths: list): + """ + Reads 1 row from each dataset assuming each file has a unique dataset prefix + """ + dfs = [] + for dir_path in tqdm.tqdm(dir_paths): + file_name = sorted([x for x in os.listdir(dir_path) if ".parquet" in x])[0] + file_path = os.path.join(dir_path, file_name) + x = dask_cudf.read_parquet(file_path).head(1) # read 1 rows from each file + dfs.append(x) + x = cudf.concat(dfs) + return x + + +def generate_mapping(x: cudf.DataFrame): + dx = x.nemo_id.str.rsplit("-", n=1, expand=True) + x["dataset"] = dx[0] + x["dataset_id"] = x.dataset.hash_values() + mapping_df = x[["dataset", "dataset_id"]] + mapping_df = mapping_df.drop_duplicates() + mapping_df["dataset_id"] = mapping_df.dataset_id.astype(str) + dataset_id_mapping = mapping_df.set_index("dataset_id")["dataset"].to_dict() + return dataset_id_mapping + + +def convert_cc_ids(cc_df: dask_cudf.DataFrame, doc_id_mapping: dict, pad_width=10): + cc_df["doc_id"] = ( + cc_df["doc_id"].astype(str).str.pad(width=pad_width, side="left", fillchar="0") + ) + cc_df["dataset_id"] = cc_df.dataset_id.astype(str).replace(doc_id_mapping) + cc_df["original_id"] = cc_df.dataset_id + "-" + cc_df.doc_id + return cc_df[["original_id", "group"]] + + +DATA_BASE = os.environ.get("DATA_BASE") +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") +ID_MAPPING = os.path.join(DATA_BASE, "dataset_id_mapping.json") + + +if __name__ == "__main__": + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + all_folders = sorted(list_deepest_folders(os.path.join(DATA_BASE, "processed"))) + df_subset = read_data_subset(all_folders) + dataset_id_mapping = generate_mapping(df_subset) + + with open(ID_MAPPING, "w") as f: + f.write(json.dumps(dataset_id_mapping)) diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py b/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py new file mode 100644 index 00000000..82b37cb3 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/1_id_conversion.py @@ -0,0 +1,65 @@ +import json +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") +CC_FOLDER = os.path.join(CC_BASE, "connected_components.parquet") +CC_CONVERTED_FOLDER = os.path.join(CC_BASE, "connected_components_converted.parquet") +ID_MAPPING = os.path.join(DATA_BASE, "dataset_id_mapping.json") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + cc_df = dd.read_parquet(CC_FOLDER, split_row_groups=False) + + with open(ID_MAPPING, "r") as f: + dataset_id_mapping = json.loads(f.read()) + + global_dataset_id_mapping = {} + for key, val in dataset_id_mapping.items(): + if "dclm" in val: + global_dataset_id_mapping[key] = "dclm" + elif "fwe2" in val: + global_dataset_id_mapping[key] = "fwe2" + elif "zyda" in val: + global_dataset_id_mapping[key] = "zyda" + elif "dolma-cc" in val: + global_dataset_id_mapping[key] = "dolma-cc" + else: + print(f"Unknown value {val} for key {key}") + + def convert_cc_ids( + cc_df, dataset_id_mapping, global_dataset_id_mapping, doc_id_len=10 + ): + cc_df["global_dataset_id"] = cc_df.dataset_id.astype(str).replace( + global_dataset_id_mapping + ) + cc_df["dataset_id"] = cc_df.dataset_id.astype(str).replace(dataset_id_mapping) + cc_df["doc_id"] = ( + cc_df["doc_id"] + .astype(str) + .str.pad(width=doc_id_len, side="left", fillchar="0") + ) + cc_df["original_id"] = cc_df.dataset_id + "-" + cc_df.doc_id + return cc_df[["global_dataset_id", "dataset_id", "original_id", "group"]] + + cc_df_converted = convert_cc_ids( + cc_df, dataset_id_mapping, global_dataset_id_mapping + ) + cc_df_converted.to_parquet(CC_CONVERTED_FOLDER, overwrite=True, write_index=False) + logging.info("Done converting!") diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py b/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py new file mode 100644 index 00000000..c59d0f89 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/2_compute_counts.py @@ -0,0 +1,96 @@ +import json +import os +import time + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +import pandas as pd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") +CC_FOLDER = os.path.join(CC_BASE, "connected_components.parquet") +CC_CONVERTED_FOLDER = os.path.join(CC_BASE, "connected_components_converted.parquet") +CC_GROUPED_FOLDER = os.path.join(CC_BASE, "connected_components_grouped.parquet") +CC_GROUPED_COUNTS_FOLDER = os.path.join( + CC_BASE, "connected_components_grouped_counts.parquet" +) +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + cc_df_converted = dd.read_parquet(CC_CONVERTED_FOLDER, split_row_groups=False) + + logging.info("Grouping by cluster id") + t0 = time.time() + + def group_partition(partition): + sizes = partition.groupby("group").size().reset_index() + + grouped = ( + partition.groupby("group") + .agg( + { + "global_dataset_id": lambda x: json.dumps(list(x)), + "dataset_id": lambda x: json.dumps(list(x)), + "original_id": lambda x: json.dumps(list(x)), + } + ) + .reset_index() + ) + + result = pd.merge(sizes, grouped, on="group") + + return result[ + ["group", "global_dataset_id", "dataset_id", "original_id", "size"] + ] + + meta = { + "group": int, + "global_dataset_id": str, + "dataset_id": str, + "original_id": str, + "size": int, + } + cc_df_grouped = cc_df_converted.map_partitions(group_partition, meta=meta) + cc_df_grouped.to_parquet(CC_GROUPED_FOLDER, write_index=False, overwrite=True) + print(f"Done grouping in {time.time() - t0:.2f} sec") + + logging.info("Computing counts") + t0 = time.time() + global_dataset_ids = ["dclm", "fwe2", "dolma-cc", "zyda"] + + def count_occurrences_in_partition(partition): + for id in global_dataset_ids: + partition[id] = partition["global_dataset_id"].apply( + lambda x: json.loads(x).count(id) + ) + return partition + + meta = { + "group": "int", + "global_dataset_id": "str", + "dataset_id": "str", + "original_id": "str", + "size": "int", + } + for id in global_dataset_ids: + meta[id] = "int" + cc_grouped_counts_df = cc_df_grouped.map_partitions( + count_occurrences_in_partition, + meta=meta, + ) + cc_grouped_counts_df.to_parquet(CC_GROUPED_COUNTS_FOLDER, overwrite=True) + + logging.info(f"Done computing counts in {time.time() - t0:.2f} sec") diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py b/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py new file mode 100644 index 00000000..d9d30f36 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/3_prep_dupes.py @@ -0,0 +1,186 @@ +import json +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +import pandas as pd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") +CC_FOLDER = os.path.join(CC_BASE, "connected_components.parquet") +CC_GROUPED_COUNTS_FOLDER = os.path.join( + CC_BASE, "connected_components_grouped_counts.parquet" +) + +DUPES_BASE = os.path.join(CC_BASE, "dupes") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) + +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + paths = { + "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), + "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), + "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), + "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), + } + + dclm_id2dir = { + "gs1": "global-shard_01_of_10", + "gs2": "global-shard_02_of_10", + "gs3": "global-shard_03_of_10", + "gs4": "global-shard_04_of_10", + "gs5": "global-shard_05_of_10", + "gs6": "global-shard_06_of_10", + "gs7": "global-shard_07_of_10", + "gs8": "global-shard_08_of_10", + "gs9": "global-shard_09_of_10", + "gs10": "global-shard_10_of_10", + } + + dclm_dir2id = {} + for key, val in dclm_id2dir.items(): + dclm_dir2id[val] = key + + # Counting digits + dclm_digits = {} + for dir in sorted(os.listdir(paths["dclm"])): + files = [ + x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x + ] + dclm_digits[dclm_dir2id[dir]] = count_digits(len(files)) + + dolma_digits = count_digits( + len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x]) + ) + + zyda_digits = {} + for dir in sorted(os.listdir(paths["zyda"])): + files = [ + x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x + ] + zyda_digits[dir] = count_digits(len(files)) + + fwe2_digits = {} + for dir in sorted(os.listdir(paths["fwe2"])): + files = [ + x for x in os.listdir(os.path.join(paths["fwe2"], dir)) if ".parquet" in x + ] + fwe2_digits[dir] = count_digits(len(files)) + + cc_grouped_counts_df = dd.read_parquet( + CC_GROUPED_COUNTS_FOLDER, split_row_groups=False + ) + cc_grouped_counts_filtered_df = cc_grouped_counts_df[ + cc_grouped_counts_df["size"] > 1 + ] + + cc_groups_counts_inter_df = cc_grouped_counts_filtered_df[ + cc_grouped_counts_filtered_df["size"] != cc_grouped_counts_filtered_df["dclm"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["fwe2"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["dolma-cc"] + ] + cc_groups_counts_inter_df = cc_groups_counts_inter_df[ + cc_groups_counts_inter_df["size"] != cc_groups_counts_inter_df["zyda"] + ] + + def select_dupes(partition): + # Removes all overlaps with fwe2 + partition_fwe2 = partition[partition["fwe2"] > 0] + partition_fwe2["dupes_to_remove"] = partition_fwe2["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "fwe2" not in id]) + ) + + # Removes all overlaps with fwe2 (after fwe2 overlaps are removed) + partition_dclm = partition[partition["fwe2"] == 0] + partition_dclm = partition_dclm[partition_dclm["dclm"] > 0] + partition_dclm["dupes_to_remove"] = partition_dclm["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dclm" not in id]) + ) + + # Removes all overlaps with zyda (after dclm, fwe2 overlaps are removed) + partition_zyda = partition[partition["fwe2"] == 0] + partition_zyda = partition_zyda[partition_zyda["dclm"] == 0] + partition_zyda = partition_zyda[partition_zyda["zyda"] > 0] + partition_zyda["dupes_to_remove"] = partition_zyda["original_id"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "zyda" not in id]) + ) + + return pd.concat([partition_dclm, partition_fwe2, partition_zyda]) + + meta = { + "group": int, + "global_dataset_id": str, + "dataset_id": str, + "original_id": str, + "size": int, + "dclm": int, + "fwe2": int, + "dolma-cc": int, + "zyda": int, + "dupes_to_remove": str, + } + dupes_df = cc_groups_counts_inter_df.map_partitions(select_dupes, meta=meta) + + def group_dupes(partition): + partition["dclm_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dclm" in id]) + ) + partition["zyda_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "zyda" in id]) + ) + partition["dolma_dupes"] = partition["dupes_to_remove"].apply( + lambda x: json.dumps([id for id in json.loads(x) if "dolma" in id]) + ) + + return partition[ + [ + "group", + "size", + "dclm", + "fwe2", + "dolma-cc", + "zyda", + "dclm_dupes", + "zyda_dupes", + "dolma_dupes", + ] + ] + + meta = { + "group": int, + "size": int, + "dclm": int, + "fwe2": int, + "dolma-cc": int, + "zyda": int, + "dclm_dupes": str, + "zyda_dupes": str, + "dolma_dupes": str, + } + + grouped_dupes_df = dupes_df.map_partitions(group_dupes, meta=meta) + grouped_dupes_df.to_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, write_index=False, overwrite=True + ) diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py new file mode 100644 index 00000000..6de77cd3 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dclm.py @@ -0,0 +1,154 @@ +import json +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") + +DUPES_BASE = os.path.join(CC_BASE, "dupes") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) + +DCLM_EXPLODED = os.path.join(DUPES_BASE, "dupes_dclm_exploded.parquet") +DUPES_DCLM_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_dclm_to_remove.jsonl") + +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + paths = { + "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), + "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), + "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), + "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), + } + + dclm_id2dir = { + "gs1": "global-shard_01_of_10", + "gs2": "global-shard_02_of_10", + "gs3": "global-shard_03_of_10", + "gs4": "global-shard_04_of_10", + "gs5": "global-shard_05_of_10", + "gs6": "global-shard_06_of_10", + "gs7": "global-shard_07_of_10", + "gs8": "global-shard_08_of_10", + "gs9": "global-shard_09_of_10", + "gs10": "global-shard_10_of_10", + } + + dclm_dir2id = {} + for key, val in dclm_id2dir.items(): + dclm_dir2id[val] = key + + # Counting digits + dclm_digits = {} + for dir in sorted(os.listdir(paths["dclm"])): + files = [ + x for x in os.listdir(os.path.join(paths["dclm"], dir)) if ".parquet" in x + ] + dclm_digits[dclm_dir2id[dir]] = count_digits(len(files)) + + # Processing DCLM dupes + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) + dclm_df = grouped_dupes_df[grouped_dupes_df["dclm_dupes"] != "[]"] + + def decode_and_explode(partition, column): + # Decode JSON strings to lists + partition["id_list"] = partition[column].apply(json.loads) + # Explode the lists + return partition.explode("id_list")[["group", "id_list"]] + + meta = { + "group": int, + "id_list": str, + } + dclm_exploded_df = dclm_df.map_partitions( + decode_and_explode, "dclm_dupes", meta=meta + ).reset_index(drop=True) + dclm_exploded_df = dclm_exploded_df.rename(columns={"id_list": "id"}) + + def split_id(df, id_column="id"): + dx = df[id_column].str.rsplit("-", n=1, expand=True) + df["doc_id"] = dx[1].astype("str") + dy = dx[0].str.split("-", n=1, expand=True) + df["global_dataset_id"] = dy[0].astype("str") + df["folder"] = dy[1].astype("str") + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "global_dataset_id": str, + "folder": str, + } + dclm_exploded_df = dclm_exploded_df.map_partitions(split_id, meta=meta) + + def split_doc_id(df): + df["row"] = df.apply( + lambda x: int(x["doc_id"][: -dclm_digits[x["folder"]]]), axis=1 + ) + df["partition"] = df.apply( + lambda x: int(x["doc_id"][-dclm_digits[x["folder"]] :]), axis=1 + ) + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "global_dataset_id": str, + "folder": str, + "row": int, + "partition": int, + } + dclm_exploded_df = dclm_exploded_df.map_partitions(split_doc_id, meta=meta) + dclm_exploded_df.to_parquet(DCLM_EXPLODED, write_index=False, overwrite=True) + + dclm_exploded_df = dd.read_parquet(DCLM_EXPLODED, split_row_groups=False) + + def write_group_to_jsonl(group): + folder_id, partition = group.name + + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) + + zipped = sorted(zipped, key=lambda x: x[0]) + + rows, groups, ids = list(zip(*zipped)) + + partition_dict = { + "rows": rows, + "groups": groups, + "ids": ids, + } + + # Writing rows + file_path = os.path.join( + DUPES_DCLM_TO_REMOVE, dclm_id2dir[folder_id], f"{partition}.jsonl" + ) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + dclm_exploded_df.groupby(["folder", "partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py new file mode 100644 index 00000000..f6749c57 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/4_get_dupes_dolma-cc.py @@ -0,0 +1,125 @@ +import json +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") + +DUPES_BASE = os.path.join(CC_BASE, "dupes") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) + +DOLMA_EXPLODED = os.path.join(DUPES_BASE, "dupes_dolma_exploded.parquet") +DUPES_DOLMA_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_dolma_to_remove.jsonl") + +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + paths = { + "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), + "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), + "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), + "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), + } + + # Counting digits + dolma_digits = count_digits( + len([x for x in os.listdir(paths["dolma-cc"]) if ".parquet" in x]) + ) + + # Processing DCLM dupes + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) + dolma_df = grouped_dupes_df[grouped_dupes_df["dolma_dupes"] != "[]"][ + ["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "dolma_dupes"] + ] + + def decode_and_explode(partition, column): + partition["id_list"] = partition[column].apply(json.loads) + return partition.explode("id_list")[["group", "id_list"]] + + # Step 2: Apply the function and reset the index + meta = { + "group": int, + "id_list": str, + } + dolma_exploded_df = dolma_df.map_partitions( + decode_and_explode, "dolma_dupes", meta=meta + ).reset_index(drop=True) + dolma_exploded_df = dolma_exploded_df.rename(columns={"id_list": "id"}) + + def split_id(df, id_column="id"): + dx = df[id_column].str.rsplit("-", n=1, expand=True) + df["doc_id"] = dx[1].astype("str") + df["dataset_id"] = dx[0].astype("str") + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "dataset_id": str, + } + dolma_exploded_df = dolma_exploded_df.map_partitions(split_id, meta=meta) + + def split_doc_id(df): + df["row"] = df.apply(lambda x: int(x["doc_id"][:-dolma_digits]), axis=1) + df["partition"] = df.apply(lambda x: int(x["doc_id"][-dolma_digits:]), axis=1) + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "dataset_id": str, + "row": int, + "partition": int, + } + dolma_exploded_df = dolma_exploded_df.map_partitions(split_doc_id, meta=meta) + dolma_exploded_df.to_parquet(DOLMA_EXPLODED, write_index=False, overwrite=True) + + dolma_exploded_df = dd.read_parquet(DOLMA_EXPLODED, split_row_groups=False) + + def write_group_to_jsonl(group): + partition = group.name + + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) + + zipped = sorted(zipped, key=lambda x: x[0]) + + rows, groups, ids = list(zip(*zipped)) + + partition_dict = { + "rows": rows, + "groups": groups, + "ids": ids, + } + + # Writing rows + file_path = os.path.join(DUPES_DOLMA_TO_REMOVE, f"{partition}.jsonl") + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + dolma_exploded_df.groupby(["partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py b/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py new file mode 100644 index 00000000..cabb471e --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/5_get_dupes_zyda.py @@ -0,0 +1,132 @@ +import json +import os + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +from nemo_curator.utils.distributed_utils import get_num_workers +from nemo_curator.utils.module_utils import count_digits + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +RAW_DATA_BASE = os.path.join(DATA_BASE, "processed") +CC_BASE = os.path.join(DATA_BASE, "fuzzy/cc/") + +DUPES_BASE = os.path.join(CC_BASE, "dupes") +DUPES_IDS_GROUPED_IN_COLUMNS = os.path.join( + DUPES_BASE, "dupes_ids_grouped_in_columns.parquet" +) + +ZYDA_EXPLODED = os.path.join(DUPES_BASE, "dupes_zyda_exploded.parquet") +DUPES_ZYDA_TO_REMOVE = os.path.join(DUPES_BASE, "dupes_zyda_to_remove.jsonl") + +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True) + client = Client(cluster) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + + paths = { + "dclm": os.path.join(RAW_DATA_BASE, "dclm-baseline-1.0-parquet/filtered"), + "dolma-cc": os.path.join(RAW_DATA_BASE, "dolma-v1_7-cc-parquet"), + "fwe2": os.path.join(RAW_DATA_BASE, "fineweb-edu-score-2/data"), + "zyda": os.path.join(RAW_DATA_BASE, "data/zyda_no_starcoder"), + } + + # Counting digits + zyda_digits = {} + for dir in sorted(os.listdir(paths["zyda"])): + files = [ + x for x in os.listdir(os.path.join(paths["zyda"], dir)) if ".parquet" in x + ] + zyda_digits[dir] = count_digits(len(files)) + + grouped_dupes_df = dd.read_parquet( + DUPES_IDS_GROUPED_IN_COLUMNS, split_row_groups=False + ) + zyda_df = grouped_dupes_df[grouped_dupes_df["zyda_dupes"] != "[]"][ + ["group", "size", "dclm", "fwe2", "zyda", "dolma-cc", "zyda_dupes"] + ] + + def decode_and_explode(partition, column): + partition["id_list"] = partition[column].apply(json.loads) + return partition.explode("id_list")[["group", "id_list"]] + + meta = { + "group": int, + "id_list": str, + } + zyda_exploded_df = zyda_df.map_partitions( + decode_and_explode, "zyda_dupes", meta=meta + ).reset_index(drop=True) + zyda_exploded_df = zyda_exploded_df.rename(columns={"id_list": "id"}) + + def split_id(df, id_column="id"): + dx = df[id_column].str.rsplit("-", n=1, expand=True) + df["doc_id"] = dx[1].astype("str") + df["folder"] = dx[0].astype("str") + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "folder": str, + } + zyda_exploded_df = zyda_exploded_df.map_partitions(split_id, meta=meta) + + def split_doc_id(df): + df["row"] = df.apply( + lambda x: int(x["doc_id"][: -zyda_digits[x["folder"]]]), axis=1 + ) + df["partition"] = df.apply( + lambda x: int(x["doc_id"][-zyda_digits[x["folder"]] :]), axis=1 + ) + return df + + meta = { + "group": int, + "id": str, + "doc_id": str, + "folder": str, + "row": int, + "partition": int, + } + zyda_exploded_df = zyda_exploded_df.map_partitions(split_doc_id, meta=meta) + zyda_exploded_df.to_parquet(ZYDA_EXPLODED, write_index=False, overwrite=True) + + zyda_exploded_df = dd.read_parquet(ZYDA_EXPLODED, split_row_groups=False) + + def write_group_to_jsonl(group): + folder_name, partition = group.name + + zipped = zip(list(group["row"]), list(group["group"]), list(group["id"])) + + zipped = sorted(zipped, key=lambda x: x[0]) + + rows, groups, ids = list(zip(*zipped)) + + partition_dict = { + "rows": rows, + "groups": groups, + "ids": ids, + } + + # Writing rows + file_path = os.path.join( + DUPES_ZYDA_TO_REMOVE, folder_name, f"{partition}.jsonl" + ) + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, "w") as f: + f.write(json.dumps(partition_dict)) + + zyda_exploded_df.groupby(["folder", "partition"]).apply( + write_group_to_jsonl, meta=() + ).compute() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py b/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py new file mode 100644 index 00000000..16238c79 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/remove_dupes.py @@ -0,0 +1,70 @@ +import argparse +import json +import os +import time + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run NeMo quality classifier.") + parser.add_argument( + "--dupes-path", + type=str, + required=True, + help="Path to the folder with dupes indices", + ) + parser.add_argument( + "--input", type=str, required=True, help="Path to the folder with input dataset" + ) + parser.add_argument( + "--output", type=str, required=True, help="Path to where write the result" + ) + parser.add_argument( + "--n-workers", type=int, default=64, help="Number of CPU Dask workers" + ) + args = parser.parse_args() + + t0 = time.time() + cluster = LocalCluster( + n_workers=args.n_workers, threads_per_worker=2, processes=True + ) + client = Client(cluster) + logging.info(f"Dask client: {client}") + logging.info(f"Dashboard link: {client.dashboard_link}") + + dupes_files = sorted([x for x in os.listdir(args.dupes_path) if ".jsonl" in x]) + ind_2_fullpath = {} + for file in dupes_files: + ind = int(file.split(".")[0]) + ind_2_fullpath[ind] = os.path.join(args.dupes_path, file) + logging.info(f"Found {len(dupes_files)} files with dupes") + + ddf = dd.read_parquet(args.input, split_row_groups=False) + + def drop_dupes(partition, partition_info=None): + ind = partition_info["number"] + + dupes_file_path = ind_2_fullpath.get(ind, None) + if not dupes_file_path: + return partition + + with open(dupes_file_path, "r") as f: + dupe_inds = json.loads(f.read())["rows"] + partition_deduped = partition.drop(index=dupe_inds) + return partition_deduped + + meta = ddf.dtypes.to_dict() + ddf_deduped = ddf.map_partitions(drop_dupes, meta=meta) + logging.info(f"Removing dupes...") + ddf_deduped.to_parquet(args.output, write_index=False, overwrite=True) + logging.info(f"Done in {time.time() - t0:.2f}sec") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh new file mode 100644 index 00000000..ba819fd8 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dclm.sh @@ -0,0 +1,127 @@ +#!/bin/bash + +IN_BASE=$DATA_BASE/processed/dclm-baseline-1.0-parquet +OUT_BASE=$DATA_BASE/zyda2/dclm-crossdeduped +DUPES_BASE=$DATA_BASE/fuzzy/cc/dupes/dupes_dclm_to_remove.jsonl +N_WORKERS=$CPU_WORKERS + + +NAME=global-shard_01_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_02_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_03_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_04_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_05_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_06_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_07_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_08_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_09_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=global-shard_10_of_10 +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dolma.sh b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dolma.sh new file mode 100644 index 00000000..01891b70 --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_dolma.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +IN_BASE=$DATA_BASE/processed/dolma-v1_7-cc-parquet +OUT_BASE=$DATA_BASE/deduped/dolma-v1_7-cc-parquet +DUPES_BASE=$DATA_BASE/fuzzy/cc/dupes/dupes_dolma_to_remove.jsonl +N_WORKERS=$CPU_WORKERS + +if test -d $IN_BASE; then + echo "Processing dolma" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $IN_BASE \ + --output $OUT_BASE \ + --n-workers $N_WORKERS +fi diff --git a/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_zyda.sh b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_zyda.sh new file mode 100644 index 00000000..bdb2754d --- /dev/null +++ b/tutorials/zyda2-tutorial/2_dupes_removal/run_remove_dupes_zyda.sh @@ -0,0 +1,78 @@ +#!/bin/bash + +IN_BASE=$DATA_BASE/processed/zyda-parquet +OUT_BASE=$DATA_BASE/deduped/zyda-parquet +DUPES_BASE=$DATA_BASE/fuzzy/cc/dupes/dupes_zyda_to_remove.jsonl +N_WORKERS=$CPU_WORKERS + +NAME=zyda_arxiv +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=zyda_c4-en +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=zyda_peS2o +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=zyda_pile-uncopyrighted +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=zyda_refinedweb +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi + +NAME=zyda_slimpajama +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python remove_dupes.py \ + --dupes-path $DUPES_BASE/$NAME \ + --input $INPUT \ + --output $OUTPUT \ + --n-workers $N_WORKERS +fi diff --git a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py new file mode 100644 index 00000000..c19720a7 --- /dev/null +++ b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.py @@ -0,0 +1,49 @@ +import argparse +import os +import time + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +from nemo_curator.classifiers import QualityClassifier +from nemo_curator.datasets import DocumentDataset +from nemo_curator.utils.distributed_utils import get_client, get_num_workers +from nemo_curator.utils.file_utils import get_remaining_files + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Run NeMo quality classifier.") + parser.add_argument("--input", help="Path to the input folder") + parser.add_argument("--output", help="Path to the output folder") + parser.add_argument( + "--batch-size", type=int, default=64, help="Batch size for quality model" + ) + args = parser.parse_args() + + t0 = time.time() + client = get_client(scheduler_file=SCHEDULER_FILE) + logging.info(f"Number of dask workers: {get_num_workers(client)}") + logging.info(client) + + classifier = QualityClassifier(batch_size=args.batch_size) + + raw_base_path = args.input + qm_base_path = args.output + files = get_remaining_files(raw_base_path, qm_base_path, "parquet") + if files: + logging.info(f"Found {len(files)} remaining files for processing") + input_dataset = DocumentDataset.read_parquet( + files, backend="cudf", add_filename=True + ) + result_dataset = classifier(dataset=input_dataset) + result_dataset.to_parquet(qm_base_path, write_to_filename=True) + else: + logging.info("Nothing to be done. All files are already processed.") + + logging.info(f"Done in {time.time() - t0}") diff --git a/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh new file mode 100644 index 00000000..249f7e13 --- /dev/null +++ b/tutorials/zyda2-tutorial/3_quality_model/run_quality_classifier.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# dolma +echo "Processing dolma" +python run_quality_classifier.py --input $DATA_BASE/deduped/dolma-v1_7-cc-parquet --output $DATA_BASE/deduped-with-quality/dolma-v1_7-cc-parquet --batch-size 64 + +# zyda +NAME=zyda_refinedweb +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 + +NAME=zyda_slimpajama +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 + +NAME=zyda_c4-en +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 + +NAME=zyda_pile-uncopyrighted +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 + +NAME=zyda_peS2o +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 + +NAME=zyda_arxiv +echo "Processing $NAME" +python run_quality_classifier.py --input $DATA_BASE/deduped/zyda-parquet/$NAME --output $DATA_BASE/deduped-with-quality/zyda-parquet/$NAME --batch-size 64 diff --git a/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py b/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py new file mode 100644 index 00000000..90a1ea97 --- /dev/null +++ b/tutorials/zyda2-tutorial/4_filtering/filter_fwe.py @@ -0,0 +1,34 @@ +import os +import time + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +import pyarrow as pa +from dask.distributed import Client, LocalCluster + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + +DATA_BASE = os.environ.get("DATA_BASE") +CPU_WORKERS = os.environ.get("CPU_WORKERS") + + +if __name__ == "__main__": + t0 = time.time() + cluster = LocalCluster(n_workers=CPU_WORKERS, threads_per_worker=2, processes=True) + client = Client(cluster) + + logging.info(f"Filtering...") + INPUT_BASE = os.path.join(DATA_BASE, "processed/fineweb-edu-score-2") + OUTPUT_BASE = os.path.join(DATA_BASE, "zyda2/fwe3") + folders = sorted(os.listdir(INPUT_BASE)) + for folder in folders: + print(f"\nProcessing {folder}") + ddf = dd.read_parquet(os.path.join(INPUT_BASE, folder)) + ddf_filtered = ddf[ddf["int_score"] >= 3].repartition(partition_size="512M") + out_folder = os.path.join(OUTPUT_BASE, folder) + print(f"Saving to {out_folder}") + ddf_filtered.to_parquet(out_folder, write_index=False, overwrite=True) + logging.info(f"Done in {time.time() - t0:.2f} sec") diff --git a/tutorials/zyda2-tutorial/4_filtering/filter_quality.py b/tutorials/zyda2-tutorial/4_filtering/filter_quality.py new file mode 100644 index 00000000..a027b9f4 --- /dev/null +++ b/tutorials/zyda2-tutorial/4_filtering/filter_quality.py @@ -0,0 +1,57 @@ +import argparse +import os +import time + +os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False" + +import logging + +import dask.dataframe as dd +import pyarrow as pa +from dask.distributed import Client, LocalCluster + +logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Filter by quality") + parser.add_argument( + "--input", type=str, required=True, help="Path to the folder with input dataset" + ) + parser.add_argument( + "--output", type=str, required=True, help="Path to where write the result" + ) + parser.add_argument( + "--n-workers", type=int, default=64, help="Number of CPU Dask workers" + ) + parser.add_argument( + "--quality_pred", + type=str, + required=True, + choices={"High", "Medium", "Low"}, + help="Quality for filtering", + ) + args = parser.parse_args() + + t0 = time.time() + cluster = LocalCluster( + n_workers=args.n_workers, threads_per_worker=2, processes=True + ) + client = Client(cluster) + + ddf = dd.read_parquet(args.input, split_row_groups=False) + ddf_filtered = ddf[ddf["quality_pred"] == args.quality_pred].repartition( + partition_size="512MB" + ) + ddf_filtered.to_parquet( + args.output, + write_index=False, + overwrite=True, + schema={"quality_prob": pa.list_(pa.float32())}, + ) + ddf_filtered = dd.read_parquet(args.output) + l_after = len(ddf_filtered) + logging.info(f"Done in {time.time() - t0:.2f} sec") + + client.cluster.close() + client.shutdown() diff --git a/tutorials/zyda2-tutorial/4_filtering/run_filter_dolma.sh b/tutorials/zyda2-tutorial/4_filtering/run_filter_dolma.sh new file mode 100644 index 00000000..c06b27a1 --- /dev/null +++ b/tutorials/zyda2-tutorial/4_filtering/run_filter_dolma.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +IN_BASE=$DATA_BASE/deduped/dolma-v1_7-cc-parquet +OUT_BASE=$DATA_BASE/zyda2/dolma-v1_7-cc-crossdeduped-filtered +N_WORKERS=$CPU_WORKERS + +if test -d $IN_BASE; then + echo "Processing dolma" + python filter_quality.py \ + --input $IN_BASE \ + --output $OUT_BASE \ + --quality_pred High \ + --n-workers $N_WORKERS +fi diff --git a/tutorials/zyda2-tutorial/4_filtering/run_filter_zyda.sh b/tutorials/zyda2-tutorial/4_filtering/run_filter_zyda.sh new file mode 100644 index 00000000..99ee1407 --- /dev/null +++ b/tutorials/zyda2-tutorial/4_filtering/run_filter_zyda.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +IN_BASE=$DATA_BASE/deduped/zyda-parquet +OUT_BASE=$DATA_BASE/zyda2/zyda-crossdeduped-filtered +N_WORKERS=$CPU_WORKERS + +NAME=zyda_arxiv +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi + +NAME=zyda_c4-en +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi + +NAME=zyda_peS2o +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi + +NAME=zyda_pile-uncopyrighted +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi + +NAME=zyda_refinedweb +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi + +NAME=zyda_slimpajama +INPUT=$IN_BASE/$NAME +OUTPUT=$OUT_BASE/$NAME +if test -d $INPUT; then + echo "Processing $NAME" + python filter_quality.py \ + --input $INPUT \ + --output $OUTPUT \ + --quality_pred High \ + --n-workers $N_WORKERS +fi diff --git a/tutorials/zyda2-tutorial/README.md b/tutorials/zyda2-tutorial/README.md new file mode 100644 index 00000000..ba382719 --- /dev/null +++ b/tutorials/zyda2-tutorial/README.md @@ -0,0 +1,152 @@ +# Zyda2 +This tutorial demonstrates how to reproduce Zyda2 dataset, that was curated by Zyphra using NeMo Curator: https://huggingface.co/datasets/Zyphra/Zyda2-5T + +## Tutorial structure +Tutorial is split into separate folders each containing scripts for running corresponding steps: +- `0_processing`: scripts for preprocessing individual datasets into the format optimal for NeMo Curator +- `1_fuzzy_dedup`: scripts for running fuzzy deduplication pipeline +- `2_dupes_removal`: scripts for post processing results of fuzzy dedup and removing duplicated documents +- `3_quality_model`: scripts for running inference of Nvidia's quality model +- `4_filtering`: scripts for filtering + +## NeMo Curator setup +Before running this tutorial one needs to set up a Dask cluster, which involves starting one Dask scheduler process on the head node and Dask workers on every compute node. +Below is an example of how things could be configured for running NeMo Curator on multiple nodes of GPUs using Infiniband between the nodes and NVLink between GPUs on the same node. +1. Set the following flags according to you cluster configuration on every node: +``` +export DASK_DISTRIBUTED_UCX__CUDA_COPY=True +export DASK_DISTRIBUTED_UCX__TCP=True +export DASK_DISTRIBUTED_UCX__NVLINK=True +export DASK_DISTRIBUTED_UCX__INFINIBAND=True +export DASK_DISTRIBUTED_UCX__RDMACM=True +export DASK_DISTRIBUTED_RMM__POOL_SIZE=1GB +export PROTOCOL=ucx +``` +2. Set the location of the scheduler file at `SCHEDULER_FILE` +3. Set the network interface you want to use at `INTERFACE` (if unsure, ask your network administrator for what works with your Infiniband setup) +3. Start Dask scheduler on your head node with `DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True DASK_DISTRIBUTED__RMM__POOL_SIZE=$RMM_SCHEDULER_POOL_SIZE dask scheduler --scheduler-file $SCHEDULER_FILE --protocol $PROTOCOL --interface $INTERFACE` +4. Start Dask workers on every compute node with `dask-cuda-worker --enable-tcp-over-ucx --enable-nvlink --enable-infiniband --enable-rdmacm --scheduler-file /shared/yury/nemo_scheduler.json --interface $INTERFACE`. [Optional] To help with potential out-of-memory memory issues due to fragmentation, one can set flags `--rmm-async --rmm-release-threshold `, which will force RMM to release cache when memory usage is higher than specified threshold (this comes with a performance hit). In addition, Dask supports spilling into CPU RAM, it should allow running workloads when there is not enough VRAM, but it comes with a big performance hit; to enable spilling specify `--enable-cudf-spill` flag. + +To comfortably reproduce Zyda2 in 2 days we recommend using a cluster with 8 nodes of H100s (or A100s with 80GB of VRAM, but it will take longer). It could be run with less, but it will run into memory issues and will require spilling into CPU RAM, slowing down processing. Scripts in this tutorial assume that all the data is being stored at a shared storage accessible to all the nodes. However, Dask supports cloud storage (like GCS or AWS S3), so with minor modifications to the scripts one can read and write to the cloud. + +## How to reproduce Zyda2 dataset +Below are step-by-step instructions on how to reproduce Zyda2 dataset. +Before running the scripts, please, start Dask cluster as in the instructions above, and make sure to set the following environment variables: +- `DATA_BASE` - base location in your filesystem to save results of processing steps +- `SCHEDULER_FILE` - file created by Dask scheduler when creating Dask cluster +- `CPU_WORKERS` - number of CPU workers for steps that don't require GPUs + + +### 1. Downloading component datasets +Most source datasets can be simply downloaded by cloning their respective HuggingFace repositories: +- DCLM: https://huggingface.co/datasets/mlfoundations/dclm-baseline-1.0-parquet +- Fineweb-edu-score-2: https://huggingface.co/datasets/HuggingFaceFW/fineweb-edu-score-2 +- Zyda: https://huggingface.co/datasets/Zyphra/Zyda +Simply clone them inside the `$DATA_BASE/raw` folder using any of the ways HuggingFace recommends for doing that (e.g., using Git LFS or huggingface-cli: https://huggingface.co/docs/huggingface_hub/en/guides/cli#download-a-dataset-or-a-space). All of the above datasets are already in parquet format, which is suitable for processing with NeMo Curator/Dask. + +However, Dolma-CC v1.7 requires special handling, since the Dolma repository only contains links to raw files. One can do the following: +1. Filter Dolma v1.7 file links to only contain the CC component of Dolma. Links can be found here: https://huggingface.co/datasets/allenai/dolma/blob/main/urls/v1_7.txt. Relevant links only contain `cc_en_head`, `cc_en_middle`, or `cc_en_tail` in their names. +2. Download those files, e.g. using wget. +3. Convert those files to parquet format and save to `$DATA_BASE/raw/dolma-v1_7-cc-parquet` + +The whole raw dataset contains roughly 12 billion documents with roughly 11.6 trillion `gpt-neox-20b` tokens. + +### 2. Preprocessing +NeMo Curator is based on Dask. Dask works best when datasets are split into partitions of small size: https://docs.dask.org/en/stable/dataframe-parquet.html#partition-size. This step includes repartitioning parquet shards to make sure they have optimal size. After some experimentation we decided to limit a partition in-memory size to 512MB. + +In addition, we add unique IDs to every document in this step, so that we can easily identify documents at later stages. + +This step can be run on CPUs or GPUs. + +Run all the Python scripts in the `0_processing` folder and it will create folders in `$DATA_BASE/processed` for all the component datasets. + +### 3. Global fuzzy deduplication +NeMo Curator implements minhash LSH fuzzy deduplication technique. The steps involve computing minhashes, identifying duplicate pairs within minhash bands, and then clustering duplicated documents using connected components computation. Minhash LSH does produce false positives and false negatives, and NeMo Curator supports explicitly checking for Jaccard similarity with anchor documents within buckets to filter out false positives. However, this is computationally expensive, and for Zyda2 we did not perform such a check. Given the parameters of minhash LSH, it is possible to theoretically estimate the rate of false positives/negatives, and in our case it is up to 2-3%, which we find acceptable. + +The whole procedure is split into four stages organized in the following scripts in folder `1_fuzzy_dedup`: +1. `0_minhash.py` - computing minhashes of every document; +2. `1_lsh.py` - splitting minhashes into bands/buckets and then shuffling the dataset by the band/bucket id; +3. `2_buckets_to_edges.py` - generating duplicate pairs, which serve as edges for connected components computation; +4. `3_connected_components.py` - computing connected components, which are essentially the clusters of duplicates. + +Every step produces its own artifacts, which are used by the subsequent steps. + +Fuzzy deduplication can only be run on GPUs in NeMo Curator. + +#### 1. Computing minhashes +The script for computing minhash signatures is located at `1_fuzzy_dedup/0_minhash.py`. + +This stage performs the following operations: +1. Generates 25-grams based on characters +2. Computes minhash signatures with of the size of 128 +3. Saves results in `$DATA_BASE/fuzzy/minhash` + +This is the most time-consuming step, though it is embarrassingly parallel and doesn't require to use much VRAM. + +#### 2. Generating LSH buckets +The script for computing LSH buckets is located at `1_fuzzy_dedup/1_lsh.py`. + +For building LSH buckets, we split minhash signatures into 8 bands (each having range 16). This gives us a theoretical 85% Jaccard similarity threshold (meaning that documents that have at least 85% similarity are deemed duplicates). + +This step performs the following operation: +1. Splits ID's into dataset_id and doc_id and converts them to integers. This step is no longer necessary, since recent releases of NeMo Curator support long strings on GPUs, but when we started our project this wasn't the default. +2. Splits minhashes of all documents into bands +3. Groups documents into buckets, that correspond to identical values of bands +4. Shuffles the resultant dataset by buckets, so that documents within the same bucket are in the same Dask partition +5. Saves results in `$DATA_BASE/fuzzy/lsh` + +This is a memory intensive step and we recommend running it on as many GPU nodes as possible to avoid spilling into CPU RAM. + +#### 3. Generating duplicate pairs +The script for computing duplicate pairs located at `1_fuzzy_dedup/2_buckets_to_edges.py`. + +This step takes the buckets of duplicated documents from the LSH step and generates a list of duplicate pairs that are subsequently used as edges in the connected components computation. + +This step assumes that the results of LSH computation are shuffled by bucket id, hence, it is very important to set the flag `split_row_groups=False` when reading the LSH buckets dataframe. + +Results of this stage are saved in `$DATA_BASE/fuzzy/buckets_to_edges`. This step doesn't consume much resources and can be run on one node. + +#### 4. Clustering duplicates using connected components +The script for clustering duplicates using connected is located at `1_fuzzy_dedup/3_connected_components.py`. + +This stage performs clustering of identified duplicated documents by identifying connected components in a graph, where the nodes are documents and the edges are duplicate pairs. + +The results of this stage are saved in `$DATA_BASE/fuzzy/cc`. This stage is memory intensive and we recommend running it on as many GPU nodes as possible. + +### 4. Identification of documents to remove +The result of the previous stage is essentially a collection of clusters of duplicated documents. Now we need to decide which of them to actually remove. Scripts in this stage can be run on CPUs. + +Since the source datasets were deduplicated with their own strategies, we decided to only remove duplicates found across datasets. We perform this in several steps. + +#### 1. Conversion of ID's back to strings +First we need to convert ID's back to the original strings, so that we are able to find documents in the datasets (if you don't perform the id conversion during the LSH step, this can be skipped). This is done in two steps: +1. Generate ID mapping using `2_dupes_removal/1_id_conversion.py` script. This must be run on GPUs (could be even just 1 GPU), since it requires running a hashing function from the `cudf` package. +2. Apply the mapping to the results of connected components, converting IDs into their original form. + +After the conversion we are ready to generate a list of documents to remove. + +#### 2. Identifying documents to remove +For simplicity we explicitly group all the duplicates by their cluster ID, then compute counts of sources of duplicated documents in every cluster and save the results to disk. This is done using the script at `2_dupes_removal/2_compute_counts.py`. + +Then we identify cross duplicates that we need to remove in the script `2_dupes_removal/3_prep_dupes.py`. There we use the following ranking of the sources (from highest to lowest): Fineweb-edu-score-2 -> DCLM -> Zyda1 -> Dolma-CC. We only identify duplicates that appear in several datasets, while preserving internal duplicates intact. Because Fineweb-edu-score-2 has the top ranking, we don't remove any dupes from it. + +Then we convert identified document ID's into a format most suitable for easy removal of documents. The scripts `2_dupes_removal/4_get_dupes_*.py` perform this operation for every component. Every ID generated in the preprocessing step actually encodes the folder and the partition the document is coming from and also the explicit row in that partition. So once we decode this information, it is straightforward to remove duplicates. + +The removal of duplicates is actually performed by bash scripts `2_dupes_removal/run_remove_dupes*.sh`, which runs Python script `2_dupes_removal/remove_dupes.py` for every component. + +The deduplicated datasets are saved explicitly in the `$DATA_BASE/deduped` folder (except for DCLM, which we save in the folder `$DATA_BASE/zyda2` as it is in its final version). + +### 5. Running quality model predictions +We ran a quality model classifier on Zyda1 and Dolma-CC v1.7 portions of our dataset. To run the prediction, use bash script `3_quality_model/run_quality_classifier.sh`. It calls the Python script `3_quality_model/run_quality_classifier.py` for all the components. All the results are saved in `$DATA_BASE/deduped-with-quality`. This step must be run on GPUs. + +### 6. Filtering +As the final step we perform filtering on some components of our dataset. + +We convert Fineweb-edu-score-2 into Fineweb-edu by keeping only the documents with edu score >=3. In principle, this dataset should be the same as the official version of Fineweb-edu. However, to be consistent we performed our own filtering in the script `4_filtering/filter_fwe.py`. + +We only keep documents marked as High quality in Zyda1 and Dolma-CC. To perform filtering of those datasets, run scripts `4_filtering/run_filter_zyda.sh` and `4_filtering/run_filter_dolma.sh`. + +The results are saved in the `$DATA_BASE/zyda2` folder. + +### 7. Final dataset +The final datasets can be found in `$DATA_BASE/zyda2`, organized in folders corresponding to different components.