Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for NeMo SDK #131

Merged
merged 13 commits into from
Jul 9, 2024
4 changes: 4 additions & 0 deletions docs/user-guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
:ref:`NeMo Curator on Kubernetes <data-curator-kubernetes>`
Demonstration of how to run the NeMo Curator on a Dask Cluster deployed on top of Kubernetes

:ref:`NeMo Curator with NeMo SDK <data-curator-nemo-sdk>`
Example of how to use NeMo Curator with NeMo SDK to run on various platforms

`Tutorials <https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials>`__
To get started, you can explore the NeMo Curator GitHub repository and follow the available tutorials and notebooks. These resources cover various aspects of data curation, including training from scratch and Parameter-Efficient Fine-Tuning (PEFT).

Expand All @@ -49,3 +52,4 @@
personalidentifiableinformationidentificationandremoval.rst
distributeddataclassification.rst
kubernetescurator.rst
nemosdk.rst
127 changes: 127 additions & 0 deletions docs/user-guide/nemosdk.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
.. _data-curator-nemo-sdk:

======================================
NeMo Curator with NeMo SDK
======================================
-----------------------------------------
NeMo SDK
-----------------------------------------

The NeMo SDK is a general purpose tool for configuring and executing Python functions and scripts acrosss various computing environments.
It is used across the NeMo Framework for managing machine learning experiments.
One of the key features of the NeMo SDK is the ability to run code locally or on platforms like SLURM with minimal changes.

-----------------------------------------
Usage
-----------------------------------------

We recommend getting slightly familiar with NeMo SDK before jumping into this. The documentation can be found here.
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

Let's walk through the example usage for how you can launch a slurm job using `examples/launch_slurm.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/nemo_sdk/launch_slurm.py>`_.

.. code-block:: python


import nemo_sdk as sdk
from nemo_sdk.core.execution import SlurmExecutor

from nemo_curator.nemo_sdk import SlurmJobConfig

@sdk.factory
def nemo_curator_slurm_executor() -> SlurmExecutor:
"""
Configure the following function with the details of your SLURM cluster
"""
return SlurmExecutor(
job_name_prefix="nemo-curator",
account="my-account",
nodes=2,
exclusive=True,
time="04:00:00",
container_image="nvcr.io/nvidia/nemo:dev",
container_mounts=["/path/on/machine:/path/in/container"],
)

First, we need to define a factory that can produce a ``SlurmExecutor``.
This exectuor is where you define all your cluster parameters. Note: NeMo SDK only supports running on SLURM clusters with `Pyxis <https://github.com/NVIDIA/pyxis>`_ right now.
After this, there is the main function

.. code-block:: python

# Path to NeMo-Curator/examples/slurm/container_entrypoint.sh on the SLURM cluster
container_entrypoint = "/cluster/path/slurm/container_entrypoint.sh"
# The NeMo Curator command to run
curator_command = "text_cleaning --input-data-dir=/path/to/data --output-clean-dir=/path/to/output"
curator_job = SlurmJobConfig(
job_dir="/home/user/jobs",
container_entrypoint=container_entrypoint,
script_command=curator_command,
)

First, we need to specify the path to `examples/slurm/container-entrypoint.sh <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/slurm/container-entrypoint.sh>`_ on the cluster.
This shell script is responsible for setting up the Dask cluster on Slurm and will be the main script run.
Therefore, we need to define the path to it.

Second, we need to establish the NeMo Curator script we want to run.
This can be a command line utility like ``text_cleaning`` we have above, or it can be your own custom script ran with ``python path/to/script.py``


Finally, we combine all of these into a ``SlurmJobConfig``. This config has many options for configuring the Dask cluster.
We'll highlight a couple of important ones:

* ``device="cpu"`` determines the type of Dask cluster to initialize. If you are using GPU modules, please set this equal to ``"gpu"``.
* ``interface="etho0"`` specifies the network interface to use for communication within the Dask cluster. It will likely be different for your Slurm cluster, so please modify as needed. You can determine what interfaces are available by running the following function on your cluster.

.. code-block:: python

from nemo_curator import get_network_interfaces

print(get_network_interfaces())

.. code-block:: python

executor = sdk.resolve(SlurmExecutor, "nemo_curator_slurm_executor")
with sdk.Experiment("example_nemo_curator_exp", executor=executor) as exp:
exp.add(curator_job.to_script(), tail_logs=True)
exp.run(detach=False)

After configuring the job, we can finally run it.
First, we use the sdk to resolve our custom factory.
Next, we use it to begin an experiment named "example_nemo_curator_exp" running on our Slurm exectuor.

``exp.add(curator_job.to_script(), tail_logs=True)`` adds the NeMo Curator script to be part of the experiment.
It converts the ``SlurmJobConfig`` to a ``sdk.Script``.
This ``curator_job.to_script()`` has two important parameters.
* ``add_scheduler_file=True``
* ``add_device=True``

Both of these modify the command specified in ``curator_command``.
Setting both to ``True`` (the default) transforms the original command from:

.. code-block:: bash

# Original command
text_cleaning \
--input-data-dir=/path/to/data \
--output-clean-dir=/path/to/output

to:

.. code-block:: bash

# Modified commmand
text_cleaning \
--input-data-dir=/path/to/data \
--output-clean-dir=/path/to/output \
--scheduler-file=/path/to/scheduler/file \
--device="cpu"


As you can see, ``add_scheduler_file=True`` causes ``--scheduler-file=/path/to/scheduer/file`` to be appended to the command, and ``add_device=True`` causes ``--device="cpu"`` (or whatever the device is set to) to be appended.
``/path/to/scheduer/file`` is determined by ``SlurmJobConfig``, and ``device`` is what the user specified in the ``device`` parameter previously.

The scheduler file argument is necessary to connect to the Dask cluster on Slurm.
All NeMo Curator scripts accept both arguments, so the default is to automatically add them.
If your script is configured differently, feel free to turn these off.

The final line ``exp.run(detach=False)`` starts the experiment on the Slurm cluster.
56 changes: 56 additions & 0 deletions examples/nemo_sdk/launch_slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import nemo_sdk as sdk
from nemo_sdk.core.execution import SlurmExecutor

from nemo_curator.nemo_sdk import SlurmJobConfig


@sdk.factory
def nemo_curator_slurm_executor() -> SlurmExecutor:
"""
Configure the following function with the details of your SLURM cluster
"""
return SlurmExecutor(
job_name_prefix="nemo-curator",
account="my-account",
nodes=2,
exclusive=True,
time="04:00:00",
container_image="nvcr.io/nvidia/nemo:dev",
container_mounts=["/path/on/machine:/path/in/container"],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe a question for Nemo_sdk (apologies for my lack of familiarity). Can users pass in additional args here for other slurm options?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha no need to apologize for your lack of familiarity. Yes the user can pass in additional args.

)


def main():
# Path to NeMo-Curator/examples/slurm/container_entrypoint.sh on the SLURM cluster
container_entrypoint = "/cluster/path/slurm/container_entrypoint.sh"
# The NeMo Curator command to run
# This command can be susbstituted with any NeMo Curator command
curator_command = "text_cleaning --input-data-dir=/path/to/data --output-clean-dir=/path/to/output"
ryantwolf marked this conversation as resolved.
Show resolved Hide resolved
curator_job = SlurmJobConfig(
job_dir="/home/user/jobs",
container_entrypoint=container_entrypoint,
script_command=curator_command,
)

executor = sdk.resolve(SlurmExecutor, "nemo_curator_slurm_executor")
with sdk.Experiment("example_nemo_curator_exp", executor=executor) as exp:
exp.add(curator_job.to_script(), tail_logs=True)
exp.run(detach=False)


if __name__ == "__main__":
main()
8 changes: 7 additions & 1 deletion examples/slurm/container-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

# Start the scheduler on the rank 0 node
if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then
# Make the directories needed
echo "Making log directory $LOGDIR"
mkdir -p $LOGDIR
echo "Making profile directory $PROFILESDIR"
mkdir -p $PROFILESDIR

echo "Starting scheduler"
if [[ $DEVICE == 'cpu' ]]; then
dask scheduler \
Expand Down Expand Up @@ -58,7 +64,7 @@ fi
sleep 60

if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then
echo "Starting $SCRIPT_PATH"
echo "Starting $SCRIPT_COMMAND"
bash -c "$SCRIPT_COMMAND"
touch $DONE_MARKER
fi
Expand Down
6 changes: 2 additions & 4 deletions examples/slurm/start-slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
export BASE_JOB_DIR=`pwd`/nemo-curator-jobs
export JOB_DIR=$BASE_JOB_DIR/$SLURM_JOB_ID

# Logging information
# Directory for Dask cluster communication and logging
# Must be paths inside the container that are accessible across nodes
export LOGDIR=$JOB_DIR/logs
export PROFILESDIR=$JOB_DIR/profiles
export SCHEDULER_FILE=$LOGDIR/scheduler.json
Expand Down Expand Up @@ -74,9 +75,6 @@ export DASK_DATAFRAME__QUERY_PLANNING=False
# End easy customization
# =================================================================

mkdir -p $LOGDIR
mkdir -p $PROFILESDIR
ryantwolf marked this conversation as resolved.
Show resolved Hide resolved

# Start the container
srun \
--container-mounts=${MOUNTS} \
Expand Down
2 changes: 1 addition & 1 deletion nemo_curator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
NemoDeployClient,
OpenAIClient,
)
from .utils.distributed_utils import get_client
from .utils.distributed_utils import get_client, get_network_interfaces

# Dask will automatically convert the list score type
# to a string without this option.
Expand Down
17 changes: 17 additions & 0 deletions nemo_curator/nemo_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .slurm import SlurmJobConfig

__all__ = ["SlurmJobConfig"]
110 changes: 110 additions & 0 deletions nemo_curator/nemo_sdk/slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Dict

from nemo_curator.utils.import_utils import safe_import

sdk = safe_import("nemo_sdk")


@dataclass
class SlurmJobConfig:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also tagging @jacobtomlinson who's done a lot of work on the dask/dask-cuda clusters with Slurm (among other things).

For now this mimics the command line setup to start clusters, but feel free to share any opinions you might have since this overlaps a lot with the dask-runners/dask-jobqueue api.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ayushdg. This seems to follow the common pattern that a lot of Slurm implementations use so I don't have any particular comments. I'm always keen to see how we can reuse code though, so maybe we could work towards a common base in dask-jobqueue that projects like this can use instead of reinventing it each time.

"""
Configuration for running a NeMo Curator script on a SLURM cluster using
NeMo SDK

Args:
job_dir: The base directory where all the files related to setting up
the Dask cluster for NeMo Curator will be written
container_entrypoint: A path to the container-entrypoint.sh script
on the cluster. container-entrypoint.sh is found in the repo
here: https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/slurm/container-entrypoint.sh
script_command: The NeMo Curator CLI tool to run. Pass any additional arguments
needed directly in this string.
device: The type of script that will be running, and therefore the type
of Dask cluster that will be created. Must be either "cpu" or "gpu".
interface: The network interface the Dask cluster will communicate over.
Use nemo_curator.get_network_interfaces() to get a list of available ones.
protocol: The networking protocol to use. Can be either "tcp" or "ucx".
Setting to "ucx" is recommended for GPU jobs if your cluster supports it.
cpu_worker_memory_limit: The maximum memory per process that a Dask worker can use.
"5GB" or "5000M" are examples. "0" means no limit.
rapids_no_initialize: Will delay or disable the CUDA context creation of RAPIDS libraries,
allowing for improved compatibility with UCX-enabled clusters and preventing runtime warnings.
cudf_spill: Enables automatic spilling (and “unspilling”) of buffers from device to host to
enable out-of-memory computation, i.e., computing on objects that occupy more memory
than is available on the GPU.
rmm_scheduler_pool_size: Sets a small pool of GPU memory for message transfers when
the scheduler is using ucx
rmm_worker_pool_size: The amount of GPU memory each GPU worker process may use.
Recommended to set at 80-90% of available GPU memory. 72GiB is good for A100/H100
libcudf_cufile_policy: Allows reading/writing directly from storage to GPU.
"""

job_dir: str
container_entrypoint: str
script_command: str
device: str = "cpu"
interface: str = "eth0"
protocol: str = "tcp"
cpu_worker_memory_limit: str = "0"
rapids_no_initialize: str = "1"
cudf_spill: str = "1"
rmm_scheduler_pool_size: str = "1GB"
rmm_worker_pool_size: str = "72GiB"
libcudf_cufile_policy: str = "OFF"
Comment on lines +61 to +68
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment, I often have trouble knowing what to set for these types of parameters. Is there anywhere the user might be able to refer to for recommendations of how to set these parameters for their specific cluster?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should release a bigger guide on our recommendations for each parameter. For now I've included a docstring that should provide a bit more context. Let me know if you want me to change anything else to make it clearer.

ryantwolf marked this conversation as resolved.
Show resolved Hide resolved

def to_script(self, add_scheduler_file: bool = True, add_device: bool = True):
"""
Converts to a script object executable by NeMo SDK
Args:
add_scheduler_file: Automatically appends a '--scheduler-file' argument to the
script_command where the value is job_dir/logs/scheduler.json. All
scripts included in NeMo Curator accept and require this argument to scale
properly on SLURM clusters.
add_device: Automatically appends a '--device' argument to the script_command
where the value is the member variable of device. All scripts included in
NeMo Curator accept and require this argument.
Returns:
A NeMo SDK Script that will intialize a Dask cluster, and run the specified command.
It is designed to be executed on a SLURM cluster
"""
env_vars = self._build_env_vars()

if add_scheduler_file:
env_vars[
"SCRIPT_COMMAND"
] += f" --scheduler-file={env_vars['SCHEDULER_FILE']}"
if add_device:
env_vars["SCRIPT_COMMAND"] += f" --device={env_vars['DEVICE']}"

# Surround the command in quotes so the variable gets set properly
env_vars["SCRIPT_COMMAND"] = f"\"{env_vars['SCRIPT_COMMAND']}\""

return sdk.Script(path=self.container_entrypoint, env=env_vars)

def _build_env_vars(self) -> Dict[str, str]:
env_vars = vars(self)
# Convert to uppercase to match container_entrypoint.sh
env_vars = {key.upper(): val for key, val in env_vars.items()}

env_vars["LOGDIR"] = f"{self.job_dir}/logs"
env_vars["PROFILESDIR"] = f"{self.job_dir}/profiles"
env_vars["SCHEDULER_FILE"] = f"{env_vars['LOGDIR']}/scheduler.json"
env_vars["SCHEDULER_LOG"] = f"{env_vars['LOGDIR']}/scheduler.log"
env_vars["DONE_MARKER"] = f"{env_vars['LOGDIR']}/done.txt"

return env_vars
Loading