Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an example for executing NeMo modules using kubernetes Python … #148

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions docs/user-guide/aws-examples/eks.rst
ryantwolf marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
======================================
Running NeMo Curator on AWS EKS
======================================

--------------------------------------
Background
--------------------------------------
AWS EKS is a fully managed service that makes it easier to run Kubernetes on AWS without needing to install, operate, and maintain your own Kubernetes control plane.

Running NeMo Curator on AWS EKS offers streamlined Kubernetes management integrated with AWS services like CloudWatch for enhanced monitoring and logging, and native auto-scaling capabilities.

For more details, refer to `EKS documentation <https://docs.aws.amazon.com/eks/latest/userguide/what-is-eks.html>`__

This guide covers all essential prerequisites. It includes an example demonstrating how to create an EFS storage class and offers step-by-step instructions for setting up an EFS Persistent Volume Claim to dynamically provisioning Kubernetes Persistent Volume. Furthermore, it outlines the required steps to deploy a Dask cluster and delves into utilizing the Kubernetes Python client library to assign NeMo-Curator tasks to the Dask scheduler.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "assign NeMo-Curator tasks to the Dask scheduler" -> "run NeMo Curator scripts"



Prerequisites:
----------------

* EKS Cluster:
* `Dask Operator <https://kubernetes.dask.org/en/latest/installing.html>`__
* If self managed node group is created with ubuntu worker nodes then install GPU operator. When setting up a self-managed node group with Ubuntu worker nodes in Amazon EKS, it's advantageous to install the GPU Operator. The GPU Operator is highly recommended as it simplifies the deployment and management of NVIDIA GPU resources within Kubernetes clusters. This operator automates the installation of NVIDIA drivers, integrates with container runtimes like containerd through the NVIDIA Container Toolkit, manages device plugins, and provides monitoring capabilities.
`GPU operator <https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/getting-started.html>`__
* If EKS managed node group is created with Amazon Linux 2 worker nodes then install Nvidia device plugin. This approach has a limitation, the pre-installed NVIDIA GPU driver version and NVIDIA container runtime version lags the release schedule from NVIDIA requires you to assume the responsibility ofr upgrading the NVIDIA devicw plugin version.
`Nvidia Device Plugin installation <https://docs.aws.amazon.com/eks/latest/userguide/eks-optimized-ami.html>`__
For more details, please refer `NVIDIA GPU Operator with Amazon EKS <https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/amazon-eks.html>`__
* Storage:
* `EFS for EKS <https://github.com/kubernetes-sigs/aws-efs-csi-driver/blob/master/docs/efs-create-filesystem.md>`__ (setup by Kubernetes cluster admin)

Create a Storage Class for AWS EFS
----------------------------------

.. code-block:: yaml

cat <<EOF | kubectl apply -f -
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: efs-sc
provisioner: efs.csi.aws.com
parameters:
provisioningMode: efs-ap
fileSystemId: ${FileSystemId} # Replace with your actual FileSystemId
directoryPerms: "700"
EOF

In the above YAML:

- Replace `${FileSystemId}` with the actual EFS FileSystemId from AWS.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. Do you mind also explaining what each of the other yaml parameters do as well?

- This definition sets up a StorageClass named `efs-sc` that provisions PersistentVolumes using the AWS EFS CSI Driver.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this bullet and expand it into a paragraph or two before this command? I'd appreciate if you could answer these questions as well:

  • What is a Storage Class?
  • Where should this command be run?
  • Why do we need to make a Storage Class?



PVC Definition
--------------------------------
Now, we can use the storage class created in the previous step to create a PVC.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on what a PVC is and why we need to make it? I have a vague idea from the name "PersistentVolumeClaim", but some more details would be nice.


.. code-block:: yaml

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: nemo-workspace
spec:
accessModes:
- ReadWriteMany
storageClassName: efs-sc
resources:
requests:
storage: 150Gi
Copy link
Collaborator

@ryantwolf ryantwolf Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too familiar with EKS, but should this be 150GiB instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for checking.

EOF

This PVC requests 150GiB of storage with ReadWriteMany access mode from the efs-sc StorageClass.

Dask cluster creation:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Change to "Dask cluster creation:" -> "Dask Cluster Creation"

----------------------

Please refer index.rst for instructions on creating a Docker secret to utilize the NeMo image and upload data to the PVC created in the previous step.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this link and also take into account my other feedback about calling out the relevant sections?


Python environment can be setup by executing the following commands:

.. code-block:: bash

python3 -m venv venv
source venv/bin/activate

pip install dask_kubernetes
pip install kubernetes

The environment to run the provided scripts needs only dask-kubernetes and kubernetes packages.

.. code-block:: bash

python3 examples/k8s/create_dask_cluster.py \
--name dask-gpu-cluster \
--n_workers 2 \
--image nvcr.io/nvidian/nemo:nightly \
--image_pull_secret ngc-registry \
--pvcs nemo-workspace:/nemo-workspace

The above command uses the create_dask_cluster python code to create 2 GPU dask workers with PVCs attached to the dask-gpu-cluster.

After the cluster is created, you can check if the scheduler and worker pods are running by executing:

.. code-block:: bash

kubectl get pods

The output will look as follows:

+---------------------------------------------------------+-------+---------+----------+------+
| NAME | READY | STATUS | RESTARTS | AGE |
+---------------------------------------------------------+-------+---------+----------+------+
| dask-kubernetes-operator-1720671237-6f8c579d4d-gk8pg | 1/1 | Running | 0 | 27h |
+---------------------------------------------------------+-------+---------+----------+------+
| rapids-dask-default-worker-be7c9e6b19-668b8cc459-cxcwg | 1/1 | Running | 0 | 21h |
+---------------------------------------------------------+-------+---------+----------+------+
| rapids-dask-default-worker-f4b5c0ff1a-66db8c4cb5-w68gd | 1/1 | Running | 0 | 21h |
+---------------------------------------------------------+-------+---------+----------+------+
| rapids-dask-scheduler-5dfc446f-9tw2t | 1/1 | Running | 0 | 21h |
+---------------------------------------------------------+-------+---------+----------+------+



Use Kubernetes Python client library to submit NeMo-Curator jobs to the Dask scheduler:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "Use Kubernetes Python client library to submit NeMo-Curator jobs to the Dask scheduler:" -> "Use Kubernetes Python client library to run NeMo Curator scripts"

------------------------------------------------------

In this method, we programmatically connect to the scheduler pod using the Kubernetes Python client library to execute the existing NeMo curator modules.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "curator" -> "Curator"


This approach can be used when employing another wrapper or service to submit jobs to Dask cluster in a distributed manner.
Copy link
Collaborator

@ryantwolf ryantwolf Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can delete this line.

submit jobs

Also, I just want to clarify the purpose of Dask real quick since I see this phrasing of "jobs" and "submitting jobs" a few times in the docs. Dask is not like SLURM or another job scheduler. It is a framework that automatically handles scaling Python code across multiple processes or nodes. Here is a simple example of Python code that uses Dask:

import dask.dataframe as dd

# Create a Dask DataFrame
df = dd.read_csv('large_dataset.csv')

# Perform operations (these are executed in parallel)
result = df.groupby('column').mean().compute()

All of the computation you see will be executed on the Dask cluster without the need for explicit job submission or anything else. You should think of Dask as being more like PyTorch than SLURM. I'd recommend looking at the 10-minutes to Dask page for more details.


1) To execute existing NeMo curator modules in a scheduler pod from outside the EKS cluster, run the following:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the numbers to use #. like follows

#. To execute existing NeMo Curator modules in a scheduler pod from outside the EKS cluster, run the following:

This will give nice formatting to the list.

Nit: "NeMo curator" -> "NeMo Curator"


.. code-block:: bash
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add 2 spaces to the beginning of this line? It should cause this code to be at the same indentation as the list


python3 examples/k8s/kubeclient.py --command "add_id --scheduler-address localhost:8786 --input-data-dir=/nemo-workspace/arxiv --output-data-dir=/nemo-workspace/arxiv-addid/" --kubeconfig "~/.kube/config"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reformat the command on multiple lines so it's more readable? Something like

python3 examples/k8s/kubeclient.py \
  --command "add_id --scheduler-address localhost:8786 --input-data-dir=/nemo-workspace/arxiv --output-data-dir=/nemo-workspace/arxiv-addid/" \
  --kubeconfig "~/.kube/config"


In this context, the --kubeconfig parameter is utilized to enable the Kubernetes Python client library to automatically load configuration settings from "~/.kube/config".
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what configuration settings it loads? Also, when is this file created?


Note: The default location of kubeconfig is $HOME/.kube/.config. You can verify this by running:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind using the .. note:: formatting? The code would look something like this:

.. note::
  The default location of kubeconfig is $HOME/.kube/.config. You can verify this by running:

  .. code-block:: bash
    
    kubectl get pod   -v6 2>&1 |awk  '/Config loaded from file:/{print $NF}'

  ``-v6`` sets the verbose level to see the kubeconfig file in use.


.. code-block:: bash

kubectl get pod -v6 2>&1 |awk '/Config loaded from file:/{print $NF}'

`v6` sets the verbose level to see the kubeconfig file in use.


2) To execute existing NeMo curator modules in a scheduler pod from another pod within the EKS cluster, add necessary permissions, such as pods/exec, and spin up a client pod.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "NeMo curator" -> "NeMo Curator"


This approach is allows the execution of NeMo Curator modules within the scheduler pod from a separate client pod. This separation ensures that the client pod can be provisioned with specific permissions tailored for executing commands and accessing resources within the Kubernetes environment.

Moreover, deploying this client pod can be orchestrated by another service such as AWS Batch, facilitating scalable and efficient management of computational tasks within Kubernetes clusters.


.. code-block:: yaml

cat <<EOF | kubectl apply -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-exec
rules:
- apiGroups:
- ""
resources:
- pods
- pods/exec
verbs:
- list
- get
- watch
- create
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: allow-pods-exec
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: ClusterRole
name: pod-exec
apiGroup: rbac.authorization.k8s.io
EOF

The above yaml file creates a ClusterRole and a ClusterRoleBinding.

ClusterRole Definition:
* Specifies permissions (rules) for interacting with Kubernetes pods.
* `resources`: `["pods", "pods/exec"]` specifies the resources pods and pods/exec.
* `verbs`: `["list", "get", "watch", "create"]` lists the actions allowed on these resources (`list`, `get`, `watch`, `create`).


ClusterRoleBinding Definition:
* Binds the `pod-exec` ClusterRole to a specific ServiceAccount (`default` in the `default` namespace).
* This means that any pods using the `default` ServiceAccount in the `default` namespace will have the permissions specified in the `pod-exec` ClusterRole.


Now, we can spin up a client pod.

.. code-block:: yaml

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: client-pod
labels:
app: client
spec:
containers:
- name: client
image: python:3.10-slim-bullseye
command: ["sh", "-c", "pip install kubernetes && sleep infinity"]
EOF

Here, we are using a light-weight public python docker image and installing kubernetes Python client package so that we can run kubeclient.py from this client pod and connect to the scheduler pod to run existing NeMo Curator modules.

Once the client-pod is up and running we can copy the kubeclient.py script into the client pod and and run the the script.

.. code-block:: bash

kubectl cp examples/k8s/kubeclient.py client-pod:kubeclient.py
kubectl exec client-pod -- python3 kubeclient.py --command "add_id --scheduler-address localhost:8786 --input-data-dir=/nemo-workspace/arxiv --output-data-dir=/nemo-workspace/arxiv-addid/"


3 changes: 3 additions & 0 deletions docs/user-guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
:ref:`Next Steps <data-curator-next-steps>`
Now that you've curated your data, let's discuss where to go next in the NeMo Framework to put it to good use.

`NeMo Curator on AWS <https://github.com/NVIDIA/NeMo-Curator/tree/main/docs/user-guide/aws-examples/>`__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you follow the pattern of other elements in the list and do something like:

:ref:`NeMo Curator on AWS <data-curator-aws-examples>`

Then create an aws-examples/index.rst with something like this:

.. _data-curator-aws-examples:

==================
AWS
==================

.. toctree::
   :maxdepth: 4
   :titlesonly:

  eks.rst

Demonstration of how to run the existing NeMo Curator modules on AWS services

`Tutorials <https://github.com/NVIDIA/NeMo-Curator/tree/main/tutorials>`__
To get started, you can explore the NeMo Curator GitHub repository and follow the available tutorials and notebooks. These resources cover various aspects of data curation, including training from scratch and Parameter-Efficient Fine-Tuning (PEFT).

Expand Down
51 changes: 51 additions & 0 deletions examples/k8s/kubeclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import argparse

from kubernetes import client, config
from kubernetes.stream import stream


def execute_command_in_scheduler_pod(api_instance, pod_name, namespace, command):
# Construct command to execute
exec_command = ["/bin/sh", "-c", command]

# Execute the command in the pod
resp = stream(
api_instance.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
print("Response: " + resp)


def get_scheduler_pod(api_instance, label_selector):
scheduler_pods = api_instance.list_pod_for_all_namespaces(
watch=False, label_selector=label_selector
)
# This returns the name of the first scheduler pod from the list
return scheduler_pods.items[0].metadata.name


if __name__ == "__main__":

parser = argparse.ArgumentParser()
parser.add_argument("--command", type=str, required=True)
parser.add_argument("--kubeconfig", type=str)
args = parser.parse_args()

# Load kube config using either the provided kubeconfig or the service account
if args.kubeconfig: # Check if args.kubeconfig is not None
config.load_kube_config(args.kubeconfig)
else:
config.load_incluster_config()

# Create Kubernetes API client
api_instance = client.CoreV1Api()

pod_name = get_scheduler_pod(api_instance, "dask.org/component=scheduler")
namespace = "default"
execute_command_in_scheduler_pod(api_instance, pod_name, namespace, args.command)
Loading