Skip to content

Commit

Permalink
Initial commit for the task 'make cmf logging easier' (#140)
Browse files Browse the repository at this point in the history
* Initial commit for the task 'make cmf logging easier'

* deleted irrelevant files

* Delete examples/simplifed-cmf/jupyter_example_readme.md

Deleting jupyter_example_readme.md as it is not needed anymore

* making some interim changes

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* changing simplified-cmf to nano-cmf

* Addressed review comments

* Update README.md of nano-cmf

* Addressed review comments

* addresessed review comments

* addressing review changes
  • Loading branch information
varkha-d-sharma authored May 9, 2024
1 parent 3d838f2 commit 8ecdd7e
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 13 deletions.
62 changes: 61 additions & 1 deletion cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def __init__(
Cmf.__prechecks()
if custom_properties is None:
custom_properties = {}
if not pipeline_name:
# assign folder name as pipeline name
cur_folder = os.path.basename(os.getcwd())
pipeline_name = cur_folder
config = mlpb.ConnectionConfig()
config.sqlite.filename_uri = filename
self.store = metadata_store.MetadataStore(config)
Expand Down Expand Up @@ -344,6 +348,15 @@ def create_execution(
Returns:
Execution object from ML Metadata library associated with the new execution for this stage.
"""
# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# Initializing the execution related fields
self.metrics = {}
self.input_artifacts = []
Expand Down Expand Up @@ -623,11 +636,25 @@ def log_dataset(
Returns:
Artifact object from ML Metadata library associated with the new dataset artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.execution:
self.create_execution(execution_type=name_without_extension)
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"

### To Do : Technical Debt.
# If the dataset already exist , then we just link the existing dataset to the execution
# We do not update the dataset properties .
# We need to append the new properties to the existing dataset properties

custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
name = re.split("/", url)[-1]
Expand Down Expand Up @@ -954,6 +981,22 @@ def log_model(
Returns:
Artifact object from ML Metadata library associated with the new model artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.execution:
self.create_execution(execution_type=name_without_extension)
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


# To Do : Technical Debt.
# If the model already exist , then we just link the existing model to the execution
# We do not update the model properties .
Expand Down Expand Up @@ -1235,6 +1278,7 @@ def log_execution_metrics_from_client(self, metrics_name: str,
Returns:
Artifact object from the ML Protocol Buffers library associated with the metrics artifact.
"""

metrics = None
custom_props = {} if custom_properties is None else custom_properties
existing_artifact = []
Expand Down Expand Up @@ -1313,6 +1357,22 @@ def log_execution_metrics(
Returns:
Artifact object from ML Metadata library associated with the new coarse-grained metrics artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.execution:
self.create_execution(execution_type=name_without_extension)
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


custom_props = {} if custom_properties is None else custom_properties
uri = str(uuid.uuid1())
metrics_name = metrics_name + ":" + uri + ":" + str(self.execution.id)
Expand Down
17 changes: 5 additions & 12 deletions examples/example-get-started/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,17 @@

2. Create python virtual environment (version >= 3.6 and < 3.9), install git, install python dependencies

3. Modify the [sample_env](./sample_env) file with appropriate values for the exports.
3. Initialise the project using [`cmf init`](./../../docs/cmf_client/cmf_client.md#cmf-init) command.

4. Execute: `source sample_env`. This scrip will export several environment variables used in
[initialize.sh](./initialize.sh) script.

5. Execute: `sh initialize.sh`. This step will perform the initialization for the directory. This will init a git repo,
dvc repo and add a git remote and dvc remote.

6. Execute `sh test_script.sh`. This file mimics a Machine Learning pipeline. It has the following stages:
4. Execute `sh test_script.sh`. This file mimics a Machine Learning pipeline. It has the following stages:
[parse](./src/parse.py), [featurize](./src/featurize.py), [train](./src/train.py) and [test](./src/test.py). It will
run the pipeline and will store its pipeline metadata in a sqlite file named mlmd. Verify that all stages are done
using "git log" command. You should see commits corresponding to the artifacts that was created.

7. Execute `dvc push` to push the artifacts to dvc remote.

8. To track the metadata of the artifacts, push the metadata files to git: `git push origin`.

5. Execute [`cmf artifact push`](./../../docs/cmf_client/cmf_client.md#cmf-artifact) command to push artifacts to the artifact repo.

6. Execute [`cmf metadata push`](./../../docs/cmf_client/cmf_client.md#cmf-metadata) command to push metadata to central cmf server. To start cmf-server, use [cmf-server.md](./../../docs/cmf_server/cmf-server.md).

### Query
The stored metadata can be explored using the query layer. Example Jupyter notebook
[Query_Tester-base_mlmd.ipynb](./Query_Tester-base_mlmd.ipynb) can be found in this directory.
Expand Down
27 changes: 27 additions & 0 deletions examples/nano-cmf/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Example Machine Learning pipeline with metadata tracking and artifact versioning using CMF.
<p>
In the traditional apporach in CMF, the typical sequence involves the initialization of the Cmf class, the creation of a context (analogous to a stage in Machine Learning), and the creation an execution before logging datasets, models, or metrics. However, CMF provides a streamlined feature wherein users have the flexibility to log datasets, models, and metrics without the explicit requirement ofcreating a context and an execution. This capability simplifies the logging process and enhances the user experience, allowing for more efficient and concise interactions with the framework.
</p>

### Steps to reproduce

1. Copy contents of `nano-cmf` directory to a separate directory outside this repository.

2. Create python virtual environment (version >= 3.7 and < 3.9), install git, install python dependencies.

3. Initialise the project using [`cmf init`](./../../docs/cmf_client/cmf_client.md#cmf-init) command.

4. Execute `sh test_script.sh`. This file mimics a Machine Learning pipeline. It has one stage:
[parse](./src/parse.py). It will run the pipeline and will store its pipeline metadata in a sqlite file named mlmd

5. Execute [`cmf artifact push`](./../../docs/cmf_client/cmf_client.md#cmf-artifact) command to push artifacts to the artifact repo.

6. Execute [`cmf metadata push`](./../../docs/cmf_client/cmf_client.md#cmf-metadata) command to push metadata to central cmf server. To start cmf-server, use [cmf-server.md](./../../docs/cmf_server/cmf-server.md).


### Query
The stored metadata can be explored using the query layer. Example Jupyter notebook
[Query_Tester-base_mlmd.ipynb](./Query_Tester-base_mlmd.ipynb) can be found in this directory.

### Clean Up
Metadata is stored in sqlite file named "mlmd". To clean up, delete the "mlmd" file.
Binary file added examples/nano-cmf/artifacts/data.xml.gz
Binary file not shown.
20 changes: 20 additions & 0 deletions examples/nano-cmf/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '3.8'

services:
minio:
image: minio/minio:RELEASE.2021-02-11T08-23-43Z
container_name: s3
ports:
- "9000:9000"
command: minio server data/

aws:
image: amazon/aws-cli
container_name: aws-cli
command: -c "sleep 2 && aws --endpoint-url http://${MYIP}:9000 s3 mb s3://dvc-art --region eu-west-1 || exit 0"
entrypoint: [/bin/bash]
environment:
AWS_ACCESS_KEY_ID: minioadmin
AWS_SECRET_ACCESS_KEY: minioadmin
depends_on:
- minio
12 changes: 12 additions & 0 deletions examples/nano-cmf/params.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
parse:
split: 0.20
seed: 20170428

featurize:
max_features: 3000
ngrams: 2

train:
seed: 20170428
n_est: 100
min_split: 64
4 changes: 4 additions & 0 deletions examples/nano-cmf/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dvc
click
pandas
PyYAML
97 changes: 97 additions & 0 deletions examples/nano-cmf/src/parse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
###
# Copyright (2022) Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###

import io
import os
import re
import sys
import yaml
import gzip
import random
import typing as t
import collections
import click
import xml.etree.ElementTree
from cmflib import cmf

__all__ = ['parse']


""" In the traditional apporach in CMF, the typical sequence involves the
initialization of the Cmf class, the creation of a context (analogous
to a stage in Machine Learning), and the creation an execution before
logging datasets, models, or metrics.
However, CMF provides a streamlined feature wherein users have the flexibility
to log datasets, models, and metrics without the explicit requirement of
creating a context and an execution.
"""

def _process_posts(fd_in: t.IO, fd_out_train: t.IO, fd_out_test: t.IO, target_tag: str, split: int) -> None:
for idx, line in enumerate(fd_in):
try:
fd_out = fd_out_train if random.random() > split else fd_out_test
attr = xml.etree.ElementTree.fromstring(line).attrib

pid = attr.get("Id", "")
label = 1 if target_tag in attr.get("Tags", "") else 0
title = re.sub(r"\s+", " ", attr.get("Title", "")).strip()
body = re.sub(r"\s+", " ", attr.get("Body", "")).strip()
text = title + " " + body

fd_out.write("{}\t{}\t{}\n".format(pid, label, text))
except Exception as ex:
sys.stderr.write(f"Skipping the broken line {idx}: {ex}\n")


def parse(input_file: str, output_dir: str) -> None:
""" Parse input file (input_file) and create train/test files in output_dir directory.
Args:
input_file: Path to a compressed (.gz) XML-lines file (data.xml.gz).
output_dir: Path to a directory that will contain train (train.tsv) and test (test.tsv) files.
Machine Learning Artifacts:
Input: ${input_file}
Output: ${output_dir}/train.tsv, ${output_dir}/test.tsv
"""
params = yaml.safe_load(open("params.yaml"))["parse"]
random.seed(params["seed"])
# Cmf class takes four parameters: filename, pipeline_name, custom_properties, graph
# User can pass any combination of these four.
metawriter = cmf.Cmf()
_ = metawriter.log_dataset(input_file, "input", custom_properties={"user-metadata1": "metadata_value"})

os.makedirs(output_dir, exist_ok=True)
Dataset = collections.namedtuple('Dataset', ['train', 'test'])
output_ds = Dataset(train=os.path.join(output_dir, "train.tsv"), test=os.path.join(output_dir, "test.tsv"))

with gzip.open(input_file, "rb") as fd_in,\
io.open(output_ds.train, "w", encoding="utf8") as fd_out_train,\
io.open(output_ds.test, "w", encoding="utf8") as fd_out_test:
_process_posts(fd_in, fd_out_train, fd_out_test, "<python>", params["split"])

_ = metawriter.log_dataset(output_ds.train, "output")
_ = metawriter.log_dataset(output_ds.test, "output")


@click.command()
@click.argument('input_file', required=True, type=str)
@click.argument('output_dir', required=True, type=str)
def parse_cli(input_file: str, output_dir: str) -> None:
parse(input_file, output_dir)


if __name__ == '__main__':
parse_cli()
34 changes: 34 additions & 0 deletions examples/nano-cmf/src/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import click
import typing as t
import pandas as pd
from cmflib import cmfquery
from tabulate import tabulate

__all__ = ['query']


def _print_executions_in_stage(cmf_query: cmfquery.CmfQuery, stage_name: str) -> None:
print('\n')
print('\n')
df: pd.DataFrame = cmf_query.get_all_executions_in_stage(stage_name)
df.drop(columns=['Git_Start_Commit', 'Git_End_Commit', 'Python_Env'], inplace=True, axis=1)
print(tabulate(df, headers='keys', tablefmt='psql'))


def query(mlmd_path: str) -> None:
cmf_query = cmfquery.CmfQuery(mlmd_path)
stages: t.List[str] = cmf_query.get_pipeline_stages("nano-cmf")
print(stages)

for stage in stages:
_print_executions_in_stage(cmf_query, stage)


@click.command()
@click.argument('mlmd_path', required=True, type=str)
def query_cli(mlmd_path: str):
query(mlmd_path)


if __name__ == '__main__':
query_cli()
4 changes: 4 additions & 0 deletions examples/nano-cmf/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pandas
pyaml
scikit-learn
scipy
7 changes: 7 additions & 0 deletions examples/nano-cmf/test_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

printf "\n[RUNNING PARSE STEP ]\n"
python src/parse.py artifacts/data.xml.gz artifacts/parsed

printf "\n[RUNNING OPTIONAL QUERY STEP]\n"
python src/query.py mlmd

0 comments on commit 8ecdd7e

Please sign in to comment.