Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataslice and StepMetrics: artifact and metadata pull and push related changes #189

Merged
merged 9 commits into from
Jul 25, 2024
4 changes: 4 additions & 0 deletions cmflib/bin/cmf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import re
import sys
from cmflib.cli import main

# this is temporary - need to remove after TripleDES warning goes away from paramiko
import warnings
warnings.filterwarnings(action='ignore', module='.*paramiko.*')

if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
175 changes: 133 additions & 42 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ def log_dataset(
Returns:
Artifact object from ML Metadata library associated with the new dataset artifact.
"""

# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
Expand Down Expand Up @@ -679,7 +678,7 @@ def log_dataset(

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

dataset_commit = c_hash
dvc_url = dvc_get_url(url)
Expand Down Expand Up @@ -1033,7 +1032,7 @@ def log_model(

if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

model_commit = c_hash

Expand Down Expand Up @@ -1478,29 +1477,53 @@ def commit_metrics(self, metrics_name: str):
Returns:
Artifact object from the ML Protocol Buffers library associated with the new metrics artifact.
"""

logging_dir = change_dir(self.cmf_init_path)
# code for nano cmf
# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.child_context:
self.create_context(pipeline_stage=name_without_extension)
assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.execution:
self.create_execution(execution_type=name_without_extension)
assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"


directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value)
os.makedirs(directory_path, exist_ok=True)
metrics_df = pd.DataFrame.from_dict(
self.metrics[metrics_name], orient="index")
metrics_df.index.names = ["SequenceNumber"]
metrics_df.to_parquet(metrics_name)
commit_output(metrics_name, self.execution.id)
uri = dvc_get_hash(metrics_name)
metrics_path = os.path.join(directory_path,metrics_name)
metrics_df.to_parquet(metrics_path)
commit_output(metrics_path, self.execution.id)
uri = dvc_get_hash(metrics_path)

if uri == "":
print("Error in getting the dvc hash,return without logging")
return null
return
metrics_commit = uri
dvc_url = dvc_get_url(metrics_path)
dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}"
name = (
metrics_name
metrics_path
+ ":"
+ uri
+ ":"
+ str(self.execution.id)
+ ":"
+ str(uuid.uuid1())
)
# passing uri value to commit
custom_props = {"Name": metrics_name, "Commit": metrics_commit}
# not needed as property 'name' is part of artifact
# to maintain uniformity - Commit goes propeties of the artifact
# custom_props = {"Name": metrics_name, "Commit": metrics_commit}
custom_props = {}
metrics = create_new_artifact_event_and_attribution(
store=self.store,
execution_id=self.execution.id,
Expand All @@ -1509,6 +1532,15 @@ def commit_metrics(self, metrics_name: str):
name=name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
properties={
# passing uri value to commit
"Commit": metrics_commit,
"url": str(dvc_url_with_pipeline),
},
artifact_type_properties={
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down Expand Up @@ -1537,20 +1569,20 @@ def commit_metrics(self, metrics_name: str):
os.chdir(logging_dir)
return metrics

def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None):
"""
Commits existing metrics associated with the given URI to MLMD.
Example:
```python
artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123",
{"custom_key": "custom_value"})
```
Args:
metrics_name: Name of the metrics.
uri: Unique identifier associated with the metrics.
custom_properties: Optional custom properties for the metrics.
def commit_existing_metrics(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None):
"""
Commits existing metrics associated with the given URI to MLMD.
Example:
```python
artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123",
{"custom_key": "custom_value"})
```
Args:
metrics_name: Name of the metrics.
uri: Unique identifier associated with the metrics.
custom_properties: Optional custom properties for the metrics.
Returns:
Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact.
Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact.
"""

custom_props = {} if custom_properties is None else custom_properties
Expand All @@ -1575,6 +1607,15 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties
name=metrics_name,
type_name="Step_Metrics",
event_type=mlpb.Event.Type.OUTPUT,
properties={
# passing uri value to commit
"Commit": props.get("Commit", ""),
"url": props.get("url", ""),
},
artifact_type_properties={
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down Expand Up @@ -1680,6 +1721,8 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice":
def read_dataslice(self, name: str) -> pd.DataFrame:
"""Reads the dataslice"""
# To do checkout if not there
directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
return df

Expand All @@ -1700,6 +1743,8 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict):
Returns:
None
"""
directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value)
name = os.path.join(directory_path, name)
df = pd.read_parquet(name)
temp_dict = df.to_dict("index")
temp_dict[record].update(custom_properties)
Expand Down Expand Up @@ -1739,7 +1784,7 @@ def add_data(
"""

self.props[path] = {}
# self.props[path]['hash'] = dvc_get_hash(path)
self.props[path]['hash'] = dvc_get_hash(path)
parent_path = path.rsplit("/", 1)[0]
self.data_parent = parent_path.rsplit("/", 1)[1]
if custom_properties:
Expand All @@ -1765,71 +1810,107 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
custom_properties: Dictionary to store key value pairs associated with Dataslice
Example{"mean":2.5, "median":2.6}
"""

logging_dir = change_dir(self.writer.cmf_init_path)
# code for nano cmf
# Assigning current file name as stage and execution name
current_script = sys.argv[0]
file_name = os.path.basename(current_script)
name_without_extension = os.path.splitext(file_name)[0]
# create context if not already created
if not self.writer.child_context:
self.writer.create_context(pipeline_stage=name_without_extension)
assert self.writer.child_context is not None, f"Failed to create context for {self.pipeline_name}!!"

# create execution if not already created
if not self.writer.execution:
self.writer.create_execution(execution_type=name_without_extension)
assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!"
varkha-d-sharma marked this conversation as resolved.
Show resolved Hide resolved

directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value)
os.makedirs(directory_path, exist_ok=True)
custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
dataslice_df = pd.DataFrame.from_dict(self.props, orient="index")
dataslice_df.index.names = ["Path"]
dataslice_df.to_parquet(self.name)
dataslice_path = os.path.join(directory_path,self.name)
dataslice_df.to_parquet(dataslice_path)
existing_artifact = []

commit_output(self.name, self.writer.execution.id)
c_hash = dvc_get_hash(self.name)
commit_output(dataslice_path, self.writer.execution.id)
c_hash = dvc_get_hash(dataslice_path)
if c_hash == "":
print("Error in getting the dvc hash,return without logging")
return null
return

dataslice_commit = c_hash
remote = dvc_get_url(self.name)
url = dvc_get_url(dataslice_path)
dvc_url_with_pipeline = f"{self.writer.parent_context.name}:{url}"
if c_hash and c_hash.strip():
existing_artifact.extend(
self.writer.store.get_artifacts_by_uri(c_hash))
if existing_artifact and len(existing_artifact) != 0:
print("Adding to existing data slice")
# Haven't added event type in this if cond, is it not needed??
slice = link_execution_to_input_artifact(
store=self.writer.store,
execution_id=self.writer.execution.id,
uri=c_hash,
input_name=self.name + ":" + c_hash,
input_name=dataslice_path + ":" + c_hash,
)
else:
props = {
"Commit": dataslice_commit, # passing c_hash value to commit
"git_repo": git_repo,
"Remote": remote,
}
props.update(custom_props)
props={
"git_repo": str(git_repo),
# passing c_hash value to commit
"Commit": str(dataslice_commit),
"url": str(dvc_url_with_pipeline),
},
slice = create_new_artifact_event_and_attribution(
store=self.writer.store,
execution_id=self.writer.execution.id,
context_id=self.writer.child_context.id,
uri=c_hash,
name=self.name + ":" + c_hash,
name=dataslice_path + ":" + c_hash,
type_name="Dataslice",
event_type=mlpb.Event.Type.OUTPUT,
custom_properties=props,
properties={
"git_repo": str(git_repo),
# passing c_hash value to commit
"Commit": str(dataslice_commit),
"url": str(dvc_url_with_pipeline),
},
artifact_type_properties={
"git_repo": mlpb.STRING,
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_props,
milliseconds_since_epoch=int(time.time() * 1000),
)
if self.writer.graph:
self.writer.driver.create_dataslice_node(
self.name, self.name + ":" + c_hash, c_hash, self.data_parent, props
self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props
)
os.chdir(logging_dir)
return slice

def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None) -> None:
# commit existing dataslice to server
def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None:
custom_props = {} if custom_properties is None else custom_properties
c_hash = uri
c_hash = uri.strip()
dataslice_commit = c_hash
existing_artifact = []
if c_hash and c_hash.strip():
existing_artifact.extend(
self.writer.store.get_artifacts_by_uri(c_hash))
if existing_artifact and len(existing_artifact) != 0:
print("Adding to existing data slice")
# Haven't added event type in this if cond, is it not needed??
slice = link_execution_to_input_artifact(
store=self.writer.store,
execution_id=self.writer.execution.id,
uri=c_hash,
input_name=self.name
input_name=self.name,
)
else:
slice = create_new_artifact_event_and_attribution(
Expand All @@ -1840,6 +1921,16 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None
name=self.name,
type_name="Dataslice",
event_type=mlpb.Event.Type.OUTPUT,
properties={
"git_repo": props.get("git_repo", ""),
"Commit": props.get("Commit", ""),
"url": props.get("url", " "),
},
artifact_type_properties={
"git_repo": mlpb.STRING,
"Commit": mlpb.STRING,
"url": mlpb.STRING,
},
custom_properties=custom_properties,
milliseconds_since_epoch=int(time.time() * 1000),
)
Expand Down
4 changes: 2 additions & 2 deletions cmflib/cmf_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id):
cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props)
elif artifact_type == "Dataslice":
dataslice = cmf_class.create_dataslice(event["artifact"]["name"])
dataslice.commit_existing(uri, custom_props)
dataslice.commit_existing(uri, props, custom_props)
elif artifact_type == "Step_Metrics":
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props)
cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props)
else:
pass

Expand Down
4 changes: 2 additions & 2 deletions cmflib/commands/artifact/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def run(self):
cmf_config=CmfConfig.read_config(cmf_config_file)
out_msg = check_minio_server(dvc_config_op)
if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS":
return out_msg
return "MinioS3 server failed to start!!!"
if dvc_config_op["core.remote"] == "osdf":
#print("key_id="+cmf_config["osdf-key_id"])
dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"])
Expand Down Expand Up @@ -100,7 +100,7 @@ def run(self):
file_set = set(names)
result = dvc_push(list(file_set))
return result

def add_parser(subparsers, parent_parser):
HELP = "Push artifacts to the user configured artifact repo."

Expand Down
Loading