Skip to content

Commit

Permalink
clean up noteb
Browse files Browse the repository at this point in the history
ook and fix pre commit

Signed-off-by: Yang Yu <yayu@yayu-mlt.client.nvidia.com>
  • Loading branch information
Yang Yu committed Oct 3, 2024
1 parent 133cb2d commit a2df246
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ filters:
- name: nemo_curator.filters.heuristic_filter.WordCountFilter
params:
min_words: 50
max_words: 100000
max_words: 100000
24 changes: 14 additions & 10 deletions tutorials/pretraining-data-curation/helper.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import gzip
import os
import json
import os

import cudf
import dask.bag as db


def convert_single_file(input_output_paths):
input_path, output_path = input_output_paths
with gzip.open(input_path, 'rt', encoding='utf-8') as f_in:
with open(output_path, 'w', encoding='utf-8') as f_out:

with gzip.open(input_path, "rt", encoding="utf-8") as f_in:
with open(output_path, "w", encoding="utf-8") as f_out:
for line in f_in:
try:
# Parse each line as a separate JSON object
item = json.loads(line)
# Write the JSON object to the .jsonl file
json.dump(item, f_out)
f_out.write('\n')
f_out.write("\n")
except json.JSONDecodeError as e:
print(f"Error decoding JSON in file {input_path}: {e}")
continue
Expand All @@ -24,16 +26,18 @@ def convert_single_file(input_output_paths):
def convert_json_gz_to_jsonl(input_dir, output_dir, partition_size=2):
# Ensure the output directory exists
os.makedirs(output_dir, exist_ok=True)

# List all .json.gz files in the input directory
file_paths = []
for filename in os.listdir(input_dir):
if filename.endswith('.json.gz'):
if filename.endswith(".json.gz"):
input_path = os.path.join(input_dir, filename)
output_filename = os.path.splitext(os.path.splitext(filename)[0])[0] + '.jsonl'
output_filename = (
os.path.splitext(os.path.splitext(filename)[0])[0] + ".jsonl"
)
output_path = os.path.join(output_dir, output_filename)
file_paths.append((input_path, output_path))

# Create a Dask bag from the file paths and apply the function in parallel
bag = db.from_sequence(file_paths, partition_size=partition_size)
bag.map(convert_single_file).compute()
Expand All @@ -47,4 +51,4 @@ def convert_str_id_to_int(df, id_column="id"):
dx = df[id_column].str.rsplit("-", n=1, expand=True)
df["doc_id"] = dx[1].astype("int64").values
df["dataset_id"] = dx[0].hash_values()
return df
return df
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,15 @@
"id": "8498f9f4-3d9a-481e-80fb-af41f7d7aa7d",
"metadata": {},
"source": [
"# Pre-training Data Curation in NeMo Curator\n",
"\n",
"The NeMo Curator is a Python library that consists of a collection of scalable data-mining modules for curating natural language processing (NLP) data for training large language models (LLMs). The modules within the NeMo Data Curator enable NLP researchers to mine high-quality text at scale from massive uncurated web corpora.\n",
"\n",
"NeMo Curator includes the following modules to perform data curation:\n",
"\n",
"- Data download and Extraction\n",
"- Language identification and separation\n",
"- Text reformatting and cleaning\n",
"- Quality filtering\n",
"- Document-level deduplication\n",
"- Multilingual downstream-task decontamination\n",
"- Distributed Data Classification\n",
"- Personal identifiable information (PII) redaction"
"# Pretraining Data Curation in NeMo Curator"
]
},
{
"cell_type": "markdown",
"id": "28d17c49",
"metadata": {},
"source": [
"# Table of Contents\n",
"## Table of Contents\n",
"\n",
"1. [Introduction](#introduction)\n",
"2. [Getting Started](#get-start)\n",
Expand All @@ -36,6 +23,24 @@
"6. [Quality filtering](#filter)"
]
},
{
"cell_type": "markdown",
"id": "4c55d981",
"metadata": {},
"source": [
"# 1. Introduction\n",
"<a id=\"introduction\"></a>\n",
"\n",
"In this tutorial, we will show how to curate large-scale data for LLM pretraining in a distributed environment using NeMo-Curator. Specifically, we will focus on the following modules in NeMo-Curator:\n",
"\n",
"- Language identification and separation\n",
"- Text reformatting and cleaning\n",
"- Quality filtering\n",
"- Document-level deduplication\n",
"\n",
"For demonstration, we will use the [RedPajama-Data-v2](#rpv2) dataset, an open dataset for LLM pretraining."
]
},
{
"cell_type": "markdown",
"id": "520eef06-0edb-4108-a048-af006dea8601",
Expand All @@ -44,25 +49,20 @@
"tags": []
},
"source": [
"# Introduction\n",
"<a id=\"introduction\"></a>\n",
"\n",
"In this tutorial, we will be demonstrating how to curate a LLM pre-training dataset using NeMo Curator.\n",
"\n",
"## System Information\n",
"## 1.1 System Information\n",
"Here is the information on the system this notebook was run on:\n",
"\n",
"- **GPU**: 2 A100 nodes (each with 8 A100-SXM4-80GB)\n",
"\n",
"- **CUDA & Nvidia Drivers**: CUDA 12.2 with Driver 535.104.12\n",
"- **CUDA & Nvidia Drivers**: CUDA 12.4 with Driver 535.104.12\n",
"\n",
"- **OS**: Ubuntu 20.04.5 LTS\n",
"- **OS**: Ubuntu 22.04.4 LTS\n",
"\n",
"## Running NeMo-Curator\n",
"## 1.2 Running NeMo-Curator\n",
"\n",
"NeMo-curator came pre-installed in Nemo framework container. This notebook use 24.07 release of the Nemo framework container. User can pull the container following the steps below:\n",
"NeMo-curator came pre-installed in Nemo Framework container. This notebook use 24.07 release of the NeMo Framework container. User can pull the container following the steps below:\n",
"\n",
"- Get access to the NeMo Frameworm container on [NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/containers/nemo)\n",
"- Get access to the NeMo Framework container on [NGC](https://catalog.ngc.nvidia.com/orgs/nvidia/containers/nemo)\n",
"\n",
"- Set your docker credentials\n",
"\n",
Expand All @@ -85,7 +85,7 @@
"id": "7d57dd35-cce6-4bfa-b34a-fb4a2ea584e0",
"metadata": {},
"source": [
"# Getting started\n",
"# 2. Getting started\n",
"<a id=\"get-start\"></a>\n",
"\n",
"NeMo-Curator uses dask for parallelization. Before we start using curator, we need to start a dask cluster. To start a multi-node dask cluster in slurm, we can use the `start-distributed-notebook.sh` script in this directory to start the cluster. The user will want to change the following:\n",
Expand Down Expand Up @@ -177,7 +177,7 @@
"id": "bf008174-a7b6-4a62-b421-0e3d84e305f2",
"metadata": {},
"source": [
"# RedPajama-Data-v2\n",
"# 3. RedPajama-Data-v2\n",
"<a id=\"rpv2\"></a>"
]
},
Expand All @@ -195,16 +195,6 @@
"The raw rpv2 data is stored in compressed json. We will first decompress the json.gz file and write them into jsonl files. For this, we will use a helper function `convert_json_gz_to_jsonl` in `helper.py`\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8d13a997",
"metadata": {},
"outputs": [],
"source": [
"from helper import convert_json_gz_to_jsonl"
]
},
{
"cell_type": "code",
"execution_count": 4,
Expand All @@ -222,6 +212,8 @@
}
],
"source": [
"from helper import convert_json_gz_to_jsonl\n",
"\n",
"input_data_dir = os.path.join(base_dir,\"rpv2-2023-06-raw\")\n",
"output_data_dir = os.path.join(base_dir,\"rpv2-2023-06\")\n",
"\n",
Expand Down Expand Up @@ -368,7 +360,7 @@
"id": "22b85d83-fd01-49cd-8618-006cf6806461",
"metadata": {},
"source": [
"Removing the raw dataset to save disk space:"
"[Optional] Removing the raw dataset to save disk space:"
]
},
{
Expand All @@ -380,7 +372,7 @@
},
"outputs": [],
"source": [
"!rm -rf /lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06"
"!rm -rf {base_dir}/rpv2-2023-06"
]
},
{
Expand Down Expand Up @@ -485,7 +477,7 @@
},
"outputs": [],
"source": [
"!rm -rf /lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-sharded"
"!rm -rf {base_dir}/rpv2-2023-06-sharded"
]
},
{
Expand Down Expand Up @@ -535,30 +527,12 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": null,
"id": "2b7e6c9b-2aa3-4a8b-89ea-6dc4ca585b8d",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--2024-08-23 16:03:29-- https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin\n",
"Resolving dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)... 13.227.74.12, 13.227.74.45, 13.227.74.118, ...\n",
"Connecting to dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)|13.227.74.12|:443... connected.\n",
"HTTP request sent, awaiting response... 200 OK\n",
"Length: 131266198 (125M) [application/octet-stream]\n",
"Saving to: ‘/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-language/lid.176.bin’\n",
"\n",
"lid.176.bin 100%[===================>] 125.18M 39.2MB/s in 4.9s \n",
"\n",
"2024-08-23 16:03:35 (25.5 MB/s) - ‘/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-language/lid.176.bin’ saved [131266198/131266198]\n",
"\n"
]
}
],
"outputs": [],
"source": [
"!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P {model_path}"
]
Expand Down Expand Up @@ -678,7 +652,7 @@
},
"outputs": [],
"source": [
"!rm -rf \"/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-id\""
"!rm -rf {base_dir}/rpv2-2023-06-id"
]
},
{
Expand Down Expand Up @@ -802,7 +776,7 @@
},
"outputs": [],
"source": [
"!rm -rf \"/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-language/data/EN\""
"!rm -rf {base_dir}/rpv2-2023-06-language/data/EN"
]
},
{
Expand Down Expand Up @@ -2948,7 +2922,6 @@
],
"source": [
"output_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06/connected_components.parquet\")\n",
"# As i repartition (i dont need to shuffle the whole thing \n",
"cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)\n",
"\n",
"# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition\n",
Expand All @@ -2961,10 +2934,10 @@
" df = df.drop(columns=['cumcount'])\n",
" return df\n",
"\n",
"# Apply the function to each partition\n",
"# Find duplicates by applying the function to each partition\n",
"docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)\n",
"\n",
"# Reset the index if necessary\n",
"# Reset the index\n",
"docs_to_remove = docs_to_remove.reset_index()\n",
"\n",
"docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n",
Expand Down Expand Up @@ -4665,7 +4638,6 @@
],
"source": [
"output_path = os.path.join(base_dir, \"fuzzy-dedup-output-2023-06-and-14/connected_components.parquet\")\n",
"# As i repartition (i dont need to shuffle the whole thing \n",
"cc_result = dask_cudf.read_parquet(output_path, split_row_groups=False).repartition(npartitions=1)\n",
"\n",
"# Set 'group' as the index and shuffle to ensure all same 'group' values are in the same partition\n",
Expand All @@ -4678,10 +4650,10 @@
" df = df.drop(columns=['cumcount'])\n",
" return df\n",
"\n",
"# Apply the function to each partition\n",
"# Find duplicates by applying the function to each partition\n",
"docs_to_remove = cc_result.map_partitions(assign_cumcount, meta=cc_result)\n",
"\n",
"# Reset the index if necessary\n",
"# Reset the index\n",
"docs_to_remove = docs_to_remove.reset_index()\n",
"\n",
"docs_to_remove = docs_to_remove[[\"dataset_id\", \"doc_id\"]]\n",
Expand Down Expand Up @@ -4718,11 +4690,11 @@
}
],
"source": [
"output_resharded_dir = expand_outdir_and_mkdir(\"/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-and-14-deduped-resharded\")\n",
"output_resharded_dir = expand_outdir_and_mkdir(os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped-resharded\"))\n",
"\n",
"t0 = time.time()\n",
"reshard_jsonl(\n",
" '/lustre/fsw/portfolios/coreai/users/yayu/data.fs5/rpv2-2023-06-and-14-deduped',\n",
" os.path.join(base_dir, \"rpv2-2023-06-and-14-deduped\"),\n",
" output_resharded_dir,\n",
" output_file_size=\"100M\",\n",
" start_index=0,\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ export DASK_DATAFRAME__QUERY_PLANNING=False
srun \
--container-mounts=${MOUNTS} \
--container-image=${CONTAINER_IMAGE} \
${CONTAINER_ENTRYPOINT}
${CONTAINER_ENTRYPOINT}

0 comments on commit a2df246

Please sign in to comment.