Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to RAPIDS 24.10 #305

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user-guide/cpuvsgpu.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The following NeMo Curator modules are GPU based.

* Exact Deduplication
* Fuzzy Deduplication
* Semantic Deduplication
* Distributed Data Classification

* Domain Classification
Expand Down
476 changes: 360 additions & 116 deletions docs/user-guide/gpudeduplication.rst

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions requirements/requirements_cuda12x.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cudf-cu12>=24.8
cugraph-cu12>=24.8
cuml-cu12>=24.8
dask-cuda>=24.8
dask-cudf-cu12>=24.8
cudf-cu12>=24.10
cugraph-cu12>=24.10
cuml-cu12>=24.10
dask-cuda>=24.10
dask-cudf-cu12>=24.10
spacy[cuda12x]>=3.6.0, <3.8.0
1 change: 1 addition & 0 deletions tutorials/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ To get started, we recommend starting with the following tutorials to become fam
| [synthetic-preference-data](./synthetic-preference-data) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) for generating synthetic preference data |
| [synthetic-retrieval-evaluation](./synthetic-retrieval-evaluation) | Demonstrates the use of NeMo Curator synthetic data generation modules to leverage [LLaMa 3.1 405B Instruct](https://build.nvidia.com/meta/llama-3_1-405b-instruct) for generating synthetic data to evaluate retrieval pipelines |
| [tinystories](./tinystories) | A comprehensive example of curating a small dataset to use for model pre-training. | [Blog post](https://developer.nvidia.com/blog/curating-custom-datasets-for-llm-training-with-nvidia-nemo-curator/)
| [zyda2-tutorial](./zyda2-tutorial) | A comprehensive tutorial on how to reproduce Zyda2 dataset. |
</div>
21 changes: 21 additions & 0 deletions tutorials/zyda2-tutorial/0_processing/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from nemo_curator import AddId
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under


def ensure_directory_exists(filename: str):
os.makedirs(os.path.dirname(filename), exist_ok=True)


def process_data(input_folder, output_folder, prefix, partition_size="512MB"):
raw_files = get_all_files_paths_under(input_folder)
raw_data = DocumentDataset.read_parquet(raw_files)
raw_data_rep = DocumentDataset(
raw_data.df.repartition(partition_size=partition_size)
)
add_id = AddId(id_field="nemo_id", id_prefix=prefix)
data_with_id = add_id(raw_data_rep)
ensure_directory_exists(output_folder)
data_with_id.to_parquet(output_folder)
51 changes: 51 additions & 0 deletions tutorials/zyda2-tutorial/0_processing/process_dclm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/dclm-baseline-1.0-parquet/filtered")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dclm-baseline-1.0-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

components = [
"global-shard_01_of_10",
"global-shard_02_of_10",
"global-shard_03_of_10",
"global-shard_04_of_10",
"global-shard_05_of_10",
"global-shard_06_of_10",
"global-shard_07_of_10",
"global-shard_08_of_10",
"global-shard_09_of_10",
"global-shard_10_of_10",
]

for i, component in enumerate(components, start=1):
input_path = os.path.join(INPUT_BASE, component)
if not os.path.exists(input_path):
continue
output_path = os.path.join(OUTPUT_BASE, component)
logging.info(f"Processing {component}")
process_data(
input_folder=input_path,
output_folder=output_path,
prefix=f"dclm-gs{i}",
)
logging.info("Done!")

client.cluster.close()
client.shutdown()
29 changes: 29 additions & 0 deletions tutorials/zyda2-tutorial/0_processing/process_dolma_cc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/dolma-v1_7-cc-parquet")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/dolma-v1_7-cc-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

logging.info(f"Processing Dolma-CC")
process_data(input_folder=INPUT_BASE, output_folder=OUTPUT_BASE, prefix="dolma-cc")
logging.info("Done!")

client.cluster.close()
client.shutdown()
82 changes: 82 additions & 0 deletions tutorials/zyda2-tutorial/0_processing/process_fwe2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import ctypes
import gc
import logging
from pathlib import Path

from dask.distributed import Client, LocalCluster
from helper import process_data

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/fineweb-edu-score-2/data")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/fineweb-edu-score-2")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


def trim_memory() -> int:
libc = ctypes.CDLL("libc.so.6")
return libc.malloc_trim(0)


def get_folder_size(folder_path):
return sum(
file.stat().st_size for file in Path(folder_path).rglob("*") if file.is_file()
)


def sort_folders_by_size(parent_directory):
folders = [
f
for f in os.listdir(parent_directory)
if os.path.isdir(os.path.join(parent_directory, f))
]
folder_sizes = [
(folder, get_folder_size(os.path.join(parent_directory, folder)))
for folder in folders
]
return sorted(folder_sizes, key=lambda x: x[1])


def bytes_to_human_readable(size_in_bytes):
suffixes = ["B", "KB", "MB", "GB", "TB", "PB"]
suffix_index = 0
size = float(size_in_bytes)
while size >= 1024 and suffix_index < len(suffixes) - 1:
size /= 1024.0
suffix_index += 1
return f"{size:.2f} {suffixes[suffix_index]}"


if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="240GB")
client = Client(cluster)
logging.info(client)

components_with_sizes = sort_folders_by_size(INPUT_BASE)

for component, component_size in components_with_sizes:
input_path = os.path.join(INPUT_BASE, component)
if not os.path.exists(input_path) or not os.path.isdir(input_path):
continue
output_path = os.path.join(OUTPUT_BASE, component)
logging.info(
f"Processing {component}, size = {bytes_to_human_readable(component_size)}"
)
process_data(
input_folder=input_path,
output_folder=output_path,
prefix=f"fwe2-{component}",
)
logging.info("Trimming memory")
gc.collect()
client.run(trim_memory)
logging.info("Done!")

client.cluster.close()
client.shutdown()
45 changes: 45 additions & 0 deletions tutorials/zyda2-tutorial/0_processing/process_zyda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging

from dask.distributed import Client, LocalCluster
from helper import process_data

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)

DATA_BASE = os.environ.get("DATA_BASE")
INPUT_BASE = os.path.join(DATA_BASE, "raw/data/zyda_no_starcoder")
OUTPUT_BASE = os.path.join(DATA_BASE, "processed/zyda-parquet")
CPU_WORKERS = os.environ.get("CPU_WORKERS")


if __name__ == "__main__":
logging.info("Starting Dask cluster")
cluster = LocalCluster(n_workers=CPU_WORKERS, processes=True, memory_limit="48GB")
client = Client(cluster)
logging.info(client)

components = [
"zyda_arxiv",
"zyda_peS2o",
"zyda_pile-uncopyrighted",
"zyda_slimpajama",
"zyda_c4-en",
"zyda_refinedweb",
]

for component in components:
input_path = os.path.join(INPUT_BASE, component)
if not os.path.exists(input_path):
continue
output_path = os.path.join(OUTPUT_BASE, component)
logging.info(f"Processing {component}")
process_data(
input_folder=input_path, output_folder=output_path, prefix=component
)
logging.info("Done!")

client.cluster.close()
client.shutdown()
67 changes: 67 additions & 0 deletions tutorials/zyda2-tutorial/1_fuzzy_dedup/0_minhash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging
import time

import dask_cudf

from nemo_curator import MinHash
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, get_num_workers
from nemo_curator.utils.file_utils import get_all_files_paths_under

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)


def read_folder(input_folder, columns=["nemo_id", "text"]):
data_paths = get_all_files_paths_under(input_folder)
data_paths = [f for f in data_paths if f.endswith(".parquet")]
data_paths.sort()
logging.info(f"Number of files being read: {len(data_paths)}")
text_ddf = dask_cudf.read_parquet(
data_paths,
columns=columns,
)
return text_ddf


DATA_BASE = os.environ.get("DATA_BASE")
SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE")


if __name__ == "__main__":
client = get_client(scheduler_file=SCHEDULER_FILE)
logging.info(f"Number of dask workers: {get_num_workers(client)}")

minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash")
minhash_output_dir = os.path.join(minhash_base_output_path, "data")

# Relevant parameters
minhash_id_field = "nemo_id"
minhash_text_field = "text"
seed = 10
minhash_length = 128
char_ngram = 25
use_64bit_hash = False

# Reading all the data
text_ddf = read_folder(
input_folder=os.path.join(DATA_BASE, "processed"),
columns=[minhash_id_field, minhash_text_field],
)

# Computing minhashes
t0 = time.time()
minhasher = MinHash(
seed=seed,
num_hashes=minhash_length,
char_ngrams=char_ngram,
use_64bit_hash=use_64bit_hash,
id_field=minhash_id_field,
text_field=minhash_text_field,
cache_dir=minhash_output_dir,
)
res = minhasher(DocumentDataset(text_ddf)).df
logging.info(f"Time taken for MinHash: {time.time()-t0:.2f}sec.")
69 changes: 69 additions & 0 deletions tutorials/zyda2-tutorial/1_fuzzy_dedup/1_lsh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os

os.environ["DASK_DATAFRAME__QUERY_PLANNING"] = "False"

import logging
import time

import cudf
import dask_cudf
import numpy as np

from nemo_curator import LSH
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.distributed_utils import get_client, get_num_workers
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import convert_str_id_to_int

logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO)


DATA_BASE = os.environ.get("DATA_BASE")
SCHEDULER_FILE = os.environ.get("SCHEDULER_FILE")


if __name__ == "__main__":
client = get_client(scheduler_file=SCHEDULER_FILE)
logging.info(f"Number of dask workers: {get_num_workers(client)}")

minhash_base_output_path = os.path.join(DATA_BASE, "fuzzy/minhash")
minhash_output_dir = os.path.join(minhash_base_output_path, "data")

# Input
lsh_input_data_path = minhash_output_dir

# Output
lsh_base_output_path = os.path.join(DATA_BASE, "fuzzy/lsh")
lsh_output_dir = os.path.join(lsh_base_output_path, "data")

# Relevant parameters
lsh_id_field = "nemo_id"
minhash_field = "_minhash_signature"
minhash_length = 128
num_bands = 8
buckets_per_shuffle = 8

t0 = time.time()

# Load MinHash output
logging.info("Converting ids")
df = dask_cudf.read_parquet(lsh_input_data_path, backend="cudf")
df = df.map_partitions(
convert_str_id_to_int,
id_column=lsh_id_field,
meta=cudf.DataFrame(
{minhash_field: [[1, 2, 3]], "doc_id": [1], "dataset_id": np.uint32(1)}
),
)
# Run LSH()
lsh = LSH(
cache_dir=lsh_output_dir,
num_hashes=minhash_length,
num_buckets=num_bands,
buckets_per_shuffle=buckets_per_shuffle,
id_fields=["dataset_id", "doc_id"],
minhash_field=minhash_field,
)
res = lsh(DocumentDataset(df))

t1 = time.time()
logging.info(f"Time taken for LSH: {time.time() - t0} s")
Loading