From e3718ccb06379a0ba59f0a0bbb5403eb5a05a23c Mon Sep 17 00:00:00 2001 From: hoangphu7122002 Date: Mon, 14 Oct 2024 11:23:00 +0700 Subject: [PATCH] add tutorials/pretraining-Vietnamese-data-curation --- ...pretraining-Vietnamese-data-curation.ipynb | 655 ++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 tutorials/pretraining-Vietnamese-data-curation/pretraining-Vietnamese-data-curation.ipynb diff --git a/tutorials/pretraining-Vietnamese-data-curation/pretraining-Vietnamese-data-curation.ipynb b/tutorials/pretraining-Vietnamese-data-curation/pretraining-Vietnamese-data-curation.ipynb new file mode 100644 index 00000000..a3cec394 --- /dev/null +++ b/tutorials/pretraining-Vietnamese-data-curation/pretraining-Vietnamese-data-curation.ipynb @@ -0,0 +1,655 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites and Environment setups" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "-\tCUDA and NVIDIA Drivers: CUDA 12.3 with Driver 545.23.08\n", + "-\tUbuntu 22.04\n", + "-\t[NVIDIA-container-toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) version 1.15.0\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Install NeMo Curator by following the instructions to install the CPU and CUDA-accelerated modules in the README file of the [NeMo Curator repository](https://github.com/NVIDIA/NeMo-Curator/tree/main). " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, install additional packages that will be needed later, which are datasets and jsonlines.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install datasets\n", + "!pip install jsonlines" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To proceed with data processing, we need to set up a Dask environment. Dask is a flexible, open-source library that enables parallel and distributed computing in Python, allowing us to scale computations across multiple cores or even clusters. By distributing tasks, Dask makes the data handling process significantly faster and more efficient." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Note:** This notebook was run on a DGX A100 with a 128-core CPU and 2TB of RAM to handle the dataset size. Depending on your dataset and computing resources, you may need to adjust the Dask worker configuration (in the next section) accordingly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import nemo_curator\n", + "from dask.distributed import Client, LocalCluster\n", + "# Start a Dask cluster with 12 workers, each limited at 64GB of memory. You might need to adjust these numbers according to your computing resources\n", + "cluster = LocalCluster(n_workers=12, processes=True, memory_limit= '64GB')\n", + "client = Client(cluster)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Collecting" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Each dataset is accessed and downloaded using the Hugging Face Hub, with additional steps required for OSCAR due to its access restrictions. For OSCAR, you need to accept the conditions on the [dataset page](https://huggingface.co/datasets/oscar-corpus/OSCAR-2301) and use a [Hugging Face access token](https://huggingface.co/docs/hub/en/security-tokens) for downloading." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Download and Convert Datasets to Parquet**\n", + "\n", + "The conversion of dataset into Parquet format facilitates efficient handling and processing of large datasets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from datasets import load_dataset as load_hf_dataset\n", + "from datasets import DownloadConfig \n", + "\n", + "data_dir = \"./datasets/\"\n", + "download_config = DownloadConfig(num_proc=4)\n", + "\n", + "# Load and save Vietnamese Wikipedia dataset\n", + "ds = load_hf_dataset(\"wikimedia/wikipedia\", \"20231101.vi\")\n", + "ds['train'].to_parquet(os.path.join(data_dir, \"wiki_vi_231101.parquet\"))\n", + "\n", + "# Load and save Vietnamese news corpus\n", + "ds = load_hf_dataset(\"jetaudio/binhvq_news\")\n", + "ds['train'].to_parquet(os.path.join(data_dir, \"binhvq_news_train.parquet\"))\n", + "\n", + "# Load and save OSCAR dataset\n", + "ds = load_hf_dataset(\"oscar-corpus/OSCAR-2301\", language='vi', token=True, download_config=download_config, trust_remote_code=True)\n", + "ds['train'].to_parquet(os.path.join(data_dir, 'oscar_vi.parquet'))\n", + "\n", + "# Load and save C4 dataset\n", + "ds = load_hf_dataset(\"allenai/c4\", data_files='multilingual/c4-vi.*.json.gz', download_config=download_config, trust_remote_code=True)\n", + "ds['train'].to_parquet(os.path.join(data_dir, \"c4_vi.parquet\"))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Combine and Standardize Format**\n", + "\n", + "We then combine them into a single dataset, keeping only the 'text' column. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from datasets import concatenate_datasets\n", + "# Combine datasets and standardize format\n", + "datasets = [os.path.join(data_dir, file) for file in [\"wiki_vi_231101.parquet\", \"c4_vi.parquet\", 'oscar_vi.parquet', \"binhvq_news_train.parquet\"]]\n", + "\n", + "data_files = {\"train\": datasets[0]}\n", + "ds = load_hf_dataset(\"parquet\", data_files=data_files)\n", + "ds = ds['train'].remove_columns([col for col in ds['train'].column_names if col != 'text'])\n", + "\n", + "for d in datasets[1:]:\n", + " ds_ = load_hf_dataset(\"parquet\", data_files={\"train\": d})\n", + " ds_ = ds_['train'].remove_columns([col for col in ds_['train'].column_names if col != 'text'])\n", + " ds = concatenate_datasets([ds, ds_])\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Shard the Combined Dataset**\n", + "\n", + "The combined dataset is then sharded into smaller chunks. Sharding is performed to distribute the data evenly across multiple workers in the Dask cluster, facilitating efficient parallel processing during the data curation stages." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Define paths for raw data\n", + "raw_data_directory = os.path.join(data_dir, \"raw\")\n", + "\n", + "# Shard the dataset\n", + "num_shards = 256\n", + "for shard_idx in range(num_shards):\n", + " shard = ds.shard(index=shard_idx, num_shards=num_shards)\n", + " shard.to_parquet(os.path.join(raw_data_directory, f\"{shard_idx}.parquet\"))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Curation flow" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Unicode reformatting" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Unicode reformatting is an essential preprocessing step to ensure that text data is standardized and free of encoding errors, which are common in web-crawled datasets" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator import Modify\n", + "from nemo_curator.modifiers import UnicodeReformatter\n", + "from nemo_curator.utils.distributed_utils import read_data, write_to_disk\n", + "from nemo_curator.utils.file_utils import get_all_files_paths_under\n", + "from nemo_curator.datasets import DocumentDataset\n", + "\n", + "# Define paths for Unicode formatted data\n", + "unicode_formatted_output_path = os.path.join(data_dir, \"formatted\")\n", + "\n", + "def load_dataset(input_data_dir, file_type='parquet'):\n", + " files = list(get_all_files_paths_under(input_data_dir))\n", + " raw_data = read_data(files, file_type=file_type, backend=\"pandas\", add_filename=True)\n", + " dataset = DocumentDataset(raw_data)\n", + "\n", + " return dataset\n", + "\n", + "# Load the raw data\n", + "raw_data = load_dataset(raw_data_directory, file_type='parquet')\n", + "\n", + "# Initialize the Unicode reformatter\n", + "cleaner = Modify(UnicodeReformatter())\n", + "\n", + "# Apply Unicode reformatting\n", + "cleaned_data = cleaner(raw_data)\n", + "\n", + "# Save the cleaned data to disk\n", + "write_to_disk(cleaned_data.df, unicode_formatted_output_path, write_to_filename=True, output_type='parquet')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Adding Custom IDs to Documents" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Before proceeding with further curation steps, it is advisable to preprocess the dataset by adding a unique ID to each document. These IDs serve as trackers that help in identifying duplicate or low-quality documents throughout the curation process, ensuring that each document remains uniquely identifiable throughout processing.
\n", + "\n", + "NeMo Curator offers an *AddId* class, which allows users to insert custom IDs into documents using a specified prefix format, such as *\\_\\*. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator import AddId\n", + "\n", + "# Define paths for input data and output with added IDs\n", + "add_id_input_data_dir = unicode_formatted_output_path\n", + "added_id_output_path = os.path.join(data_dir, \"add_id\")\n", + "add_ID_id_prefix = \"VI_\"\n", + "\n", + "# Load the formatted dataset\n", + "dataset = load_dataset(add_id_input_data_dir, file_type='parquet')\n", + "\n", + "# Initialize the AddId class with a specified prefix and start index\n", + "add_id = AddId(id_field='id', id_prefix=add_ID_id_prefix, start_index=0)\n", + "\n", + "# Apply the ID addition to the dataset\n", + "id_dataset = add_id(dataset)\n", + "\n", + "# Save the dataset with added IDs to disk\n", + "write_to_disk(id_dataset.df, output_file_dir=added_id_output_path, write_to_filename=True, output_type='parquet')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Exact deduplication" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Exact deduplication removes identical duplicates from the dataset. By eliminating exact duplicates, we ensure that each data point contributes uniquely to the training process, enhancing the diversity and overall quality of the dataset." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this stage, we’ll leverage GPU acceleration by utilizing a Dask CUDA cluster. Since the current cluster is CPU-based, we need to shut it down and start a new one with GPU support.\n", + "\n", + "To close the existing cluster:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.cluster.close()\n", + "client.shutdown()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then, to initialize the GPU Dask cluster:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.utils.distributed_utils import get_client\n", + "\n", + "\n", + "def pre_imports():\n", + " import cudf \n", + "\n", + "\n", + "client = get_client(cluster_type='gpu', set_torch_to_use_rmm=False)\n", + "client.run(pre_imports)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Below is the implementation of exact deduplication" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.modules import ExactDuplicates\n", + "\n", + "\n", + "# Define input and output paths\n", + "exact_dedup_input_dataset_dir = added_id_output_path\n", + "exact_dedup_base_output_path = os.path.join(data_dir, \"exact_dedup\")\n", + "exact_dedup_log_dir = os.path.join(exact_dedup_base_output_path, 'log')\n", + "exact_dedup_output_dir = os.path.join(exact_dedup_base_output_path, 'data')\n", + "deduped_output_dir = os.path.join(data_dir,\"remove_duplicate\")\n", + "\n", + "\n", + "# Create directories for logs and output\n", + "!mkdir -p {exact_dedup_log_dir}\n", + "!mkdir -p {exact_dedup_output_dir}\n", + "!mkdir -p {deduped_output_dir}\n", + "\n", + "\n", + "# Parameters for ExactDuplicates\n", + "exact_dedup_dataset_id_field = \"id\"\n", + "exact_dedup_dataset_text_field = \"text\"\n", + "\n", + "\n", + "# Load the input dataset\n", + "input_dataset = DocumentDataset.read_parquet(exact_dedup_input_dataset_dir, backend='cudf’)\n", + "\n", + "\n", + "# Initialize and run exact deduplication\n", + "exact_dup = ExactDuplicates(\n", + " logger=exact_dedup_log_dir,\n", + " id_field=exact_dedup_dataset_id_field,\n", + " text_field=exact_dedup_dataset_text_field,\n", + " hash_method=\"md5\",\n", + " cache_dir=exact_dedup_output_dir\n", + ")\n", + "duplicates = exact_dup(dataset=input_dataset)\n", + "\n", + "\n", + "print(f\"Number of exact duplicate files: {len(duplicates)}\")\n", + "\n", + "\n", + "# Load the dataset,exact duplicates to identify and remove duplicate IDs\n", + "input_dataset = DocumentDataset.read_parquet(added_id_output_path, backend='cudf’)\n", + "exact_duplicates = DocumentDataset.read_parquet(\n", + " os.path.join(exact_dedup_output_dir, \"_exact_duplicates.parquet\"), backend='cudf’\n", + ")\n", + "\n", + "\n", + "# Extract list of duplicate document IDs\n", + "exact_docs_to_remove = exact_duplicates.df.map_partitions(\n", + " lambda x: x[x._hashes.duplicated(keep=\"first\")]\n", + ")\n", + "\n", + "\n", + "# Remove duplicated documents from the input dataset\n", + "result = input_dataset.df[\n", + " ~input_dataset.df[exact_dedup_dataset_id_field].isin(exact_docs_to_remove[exact_dedup_dataset_id_field].compute())\n", + "]\n", + "\n", + "\n", + "# Save the final deduplicated dataset\n", + "write_to_disk(result, output_file_dir=deduped_output_dir, write_to_filename=True, output_type='parquet')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Close the GPU Dask cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.cluster.close()\n", + "client.shutdown()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Heuristic Quality Filtering" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Heuristic quality filtering is designed to enhance the quality of the dataset by removing low-quality content based on predefined heuristics. This approach involves applying a series of filters to the dataset to eliminate undesirable data characteristics such as excessive special characters, overly short or long texts, or other criteria that could negatively impact model performance.\n", + "\n", + "We use a YAML file to define the heuristic filters. The configuration can be found [here](https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/config/heuristic_filter_non-en.yaml). This file lists the filtering criteria and settings used to build a filter pipeline. You can customize the filters or change thresholds based on your needs. The *filter_pipeline* helper reads the YAML settings and applies each filter to the dataset step by step.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Re-Create a CPU Dask Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start a Dask cluster with 12 workers, each limited at 64GB of memory. You might need to adjust these numbers according to your computing resources\n", + "cluster = LocalCluster(n_workers=12, processes=True, memory_limit= '64GB')\n", + "client = Client(cluster)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.utils.config_utils import build_filter_pipeline\n", + "import warnings\n", + "\n", + "# Define paths for input data and output data after heuristic filtering\n", + "HF_input_data_dir = deduped_output_dir\n", + "HF_output_path = os.path.join(data_dir, 'heuristic_filtering')\n", + "\n", + "# Create a directory for the configuration file if it doesn't exist\n", + "os.makedirs('config', exist_ok=True)\n", + "# Download the YAML configuration file for heuristic filtering\n", + "!wget https://raw.githubusercontent.com/NVIDIA/NeMo-Curator/main/config/heuristic_filter_non-en.yaml -O ./config/heuristic_filter_non-en.yaml\n", + "\n", + "# Specify the path to the configuration file\n", + "filter_config_file = './config/heuristic_filter_non-en.yaml'\n", + "os.makedirs(HF_output_path, exist_ok=True)\n", + "\n", + "# Load the filters from the YAML configuration file\n", + "filter_pipeline = build_filter_pipeline(filter_config_file)\n", + "\n", + "# Load the dataset\n", + "dataset = DocumentDataset.read_parquet(HF_input_data_dir, backend='pandas')\n", + "\n", + "# Suppress specific warnings during filtering\n", + "with warnings.catch_warnings():\n", + " warnings.simplefilter(\"ignore\", category=UserWarning)\n", + " # Apply the heuristic filters to the dataset\n", + " result_data = filter_pipeline(dataset)\n", + " \n", + " # Save the filtered dataset to disk\n", + " result_data.to_parquet(HF_output_path, write_to_filename=True)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Classifier-based quality filtering" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Classifier-based filtering uses a trained classifier model to sort content as high or low quality, offering a smarter and more flexible way to handle diverse datasets that simple rules might miss." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Prepare Data for Training Classifier**\n", + "\n", + "To train a quality classifier, we need representative samples of both high-quality and low-quality content. For high-quality data, we use articles from Wikipedia's Vietnamese edition, which are generally well-structured and reliable. The low-quality samples come from unfiltered crawled Vietnamese news corpus." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Paths for high-quality and low-quality sample data\n", + "hq_samples_path = os.path.join(data_dir, 'classifier_filtering/train_samples/hq')\n", + "lq_samples_path = os.path.join(data_dir, 'classifier_filtering/train_samples/lq')\n", + "\n", + "\n", + "# Load and shard the high-quality dataset\n", + "ds = load_hf_dataset(\"wikimedia/wikipedia\", \"20231101.vi\")\n", + "num_shards = 8\n", + "for shard_idx in range(num_shards):\n", + " shard = ds['train'].shard(index=shard_idx, num_shards=num_shards)\n", + " shard.to_parquet(os.path.join(hq_samples_path, f\"{shard_idx}.parquet\"))\n", + "\n", + "\n", + "# Load and shard the low-quality dataset\n", + "ds = load_hf_dataset(\"vietgpt/binhvq_news_vi\")\n", + "num_shards = 32\n", + "for shard_idx in range(num_shards):\n", + " shard = ds['train'].shard(index=shard_idx, num_shards=num_shards)\n", + " shard.to_parquet(os.path.join(lq_samples_path, f\"{shard_idx}.parquet\"))\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Training classifier**\n", + "\n", + "The classifier is trained using FastText, which offers an efficient and effective method for text classification. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.modifiers import FastTextLabelModifier\n", + "import fasttext\n", + "import random\n", + "\n", + "# Function to create labeled samples\n", + "def create_samples(data_path, label, num_samples):\n", + " raw_dataset = DocumentDataset.read_parquet(data_path, backend='pandas')\n", + " label_quality = Modify(FastTextLabelModifier(label))\n", + " labeled_dataset = label_quality(raw_dataset)\n", + " labeled_samples = labeled_dataset.df.sample(frac=num_samples / len(labeled_dataset.df))\n", + " return labeled_samples[\"text\"].compute().values.tolist()\n", + "\n", + "# Prepare training data\n", + "low_quality_samples = create_samples(lq_samples_path, \"__label__lq\", 100000)\n", + "high_quality_samples = create_samples(hq_samples_path, \"__label__hq\", 100000)\n", + "train_samples = low_quality_samples + high_quality_samples\n", + "random.shuffle(train_samples)\n", + "\n", + "# Save training data to a file\n", + "train_file = \"./cf_model_fasttext.train\"\n", + "with open(train_file, \"w\") as f:\n", + " for sample in train_samples:\n", + " f.write(sample + \"\\n\")\n", + "\n", + "# Train the FastText classifier\n", + "model = fasttext.train_supervised(input=train_file, lr=0.01, dim=100, epoch=5, wordNgrams=2)\n", + "model_path = \"./cf_model_fasttext_model.bin\"\n", + "model.save_model(model_path)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**Classify and filter the dataset**\n", + "\n", + "Once trained, the classifier is used to filter the dataset, categorizing documents into high and low quality based on the learned distinctions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.filters import FastTextQualityFilter\n", + "from nemo_curator import ScoreFilter\n", + "\n", + "# Define paths and load the dataset\n", + "CF_input_data_dir = HF_output_path\n", + "CF_output_path = os.path.join(data_dir, 'classifier_filtering/output')\n", + "target_dataset = DocumentDataset.read_parquet(CF_input_data_dir, 'parquet')\n", + "\n", + "# Set up the filtering pipeline\n", + "filter_pipeline = ScoreFilter(FastTextQualityFilter(model_path), score_field=\"quality_score\", score_type=float)\n", + "filtered_dataset = filter_pipeline(target_dataset)\n", + "\n", + "# Save the filtered dataset\n", + "write_to_disk(filtered_dataset.df, output_file_dir=CF_output_path, write_to_filename=True, output_type='parquet')\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Close the CPU Dask Cluster" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.cluster.close()\n", + "client.shutdown()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have completed the notebook! For other techniques such as Fuzzy Deduplication or PII redaction, you can go to [NeMo Curator example scripts](https://github.com/NVIDIA/NeMo-Curator/tree/main/examples)." + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}