From f322a77ae4d829b42a93e8ada13801ef34834316 Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Tue, 23 Jul 2024 23:48:00 -0700 Subject: [PATCH 1/8] changes made for dataslice and metrics artifact and metadata pull/push --- cmflib/cmf.py | 167 +++++++++++++----- cmflib/cmf_merger.py | 4 +- examples/example-get-started/src/train.py | 3 + .../example-get-started/test-data-slice.py | 11 +- 4 files changed, 132 insertions(+), 53 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 4b795bb5..1f7575c3 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -132,6 +132,7 @@ def __init__( config.sqlite.filename_uri = filepath self.store = metadata_store.MetadataStore(config) self.filepath = filepath + self.dataslice_path = None self.child_context = None self.execution = None self.execution_name = "" @@ -646,7 +647,6 @@ def log_dataset( Returns: Artifact object from ML Metadata library associated with the new dataset artifact. """ - # Assigning current file name as stage and execution name current_script = sys.argv[0] file_name = os.path.basename(current_script) @@ -679,7 +679,7 @@ def log_dataset( if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return dataset_commit = c_hash dvc_url = dvc_get_url(url) @@ -1033,7 +1033,7 @@ def log_model( if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return model_commit = c_hash @@ -1478,20 +1478,40 @@ def commit_metrics(self, metrics_name: str): Returns: Artifact object from the ML Protocol Buffers library associated with the new metrics artifact. """ + # code for nano cmf is remaining + # Assigning current file name as stage and execution name + current_script = sys.argv[0] + file_name = os.path.basename(current_script) + name_without_extension = os.path.splitext(file_name)[0] + # create context if not already created + if not self.child_context: + self.create_context(pipeline_stage=name_without_extension) + assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!" + + # create execution if not already created + if not self.execution: + self.create_execution(execution_type=name_without_extension) + assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" + logging_dir = change_dir(self.cmf_init_path) + directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value) + os.makedirs(directory_path, exist_ok=True) metrics_df = pd.DataFrame.from_dict( self.metrics[metrics_name], orient="index") metrics_df.index.names = ["SequenceNumber"] - metrics_df.to_parquet(metrics_name) - commit_output(metrics_name, self.execution.id) - uri = dvc_get_hash(metrics_name) + metrics_path = os.path.join(directory_path,metrics_name) + metrics_df.to_parquet(metrics_path) + commit_output(metrics_path, self.execution.id) + uri = dvc_get_hash(metrics_path) if uri == "": print("Error in getting the dvc hash,return without logging") - return null + return metrics_commit = uri + dvc_url = dvc_get_url(metrics_path) + dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}" name = ( - metrics_name + metrics_path + ":" + uri + ":" @@ -1499,8 +1519,10 @@ def commit_metrics(self, metrics_name: str): + ":" + str(uuid.uuid1()) ) - # passing uri value to commit - custom_props = {"Name": metrics_name, "Commit": metrics_commit} + # not needed as property 'name' is part of artifact + # to maintain uniformity - Commit goes propeties of the artifact + # custom_props = {"Name": metrics_name, "Commit": metrics_commit} + custom_props = {} metrics = create_new_artifact_event_and_attribution( store=self.store, execution_id=self.execution.id, @@ -1509,6 +1531,15 @@ def commit_metrics(self, metrics_name: str): name=name, type_name="Step_Metrics", event_type=mlpb.Event.Type.OUTPUT, + properties={ + # passing uri value to commit + "Commit": metrics_commit, + "url": str(dvc_url_with_pipeline), + }, + artifact_type_properties={ + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) @@ -1537,20 +1568,20 @@ def commit_metrics(self, metrics_name: str): os.chdir(logging_dir) return metrics - def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None): - """ - Commits existing metrics associated with the given URI to MLMD. - Example: - ```python - artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", - {"custom_key": "custom_value"}) - ``` - Args: - metrics_name: Name of the metrics. - uri: Unique identifier associated with the metrics. - custom_properties: Optional custom properties for the metrics. + def commit_existing_metrics(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None): + """ + Commits existing metrics associated with the given URI to MLMD. + Example: + ```python + artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", + {"custom_key": "custom_value"}) + ``` + Args: + metrics_name: Name of the metrics. + uri: Unique identifier associated with the metrics. + custom_properties: Optional custom properties for the metrics. Returns: - Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. + Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. """ custom_props = {} if custom_properties is None else custom_properties @@ -1575,6 +1606,15 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties name=metrics_name, type_name="Step_Metrics", event_type=mlpb.Event.Type.OUTPUT, + properties={ + # passing uri value to commit + "Commit": props.get("Commit", ""), + "url": props.get("url", ""), + }, + artifact_type_properties={ + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) @@ -1680,6 +1720,7 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice": def read_dataslice(self, name: str) -> pd.DataFrame: """Reads the dataslice""" # To do checkout if not there + name = os.path.join(self.dataslice_path, name) df = pd.read_parquet(name) return df @@ -1700,6 +1741,7 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): Returns: None """ + name = os.path.join(self.dataslice_path, name) df = pd.read_parquet(name) temp_dict = df.to_dict("index") temp_dict[record].update(custom_properties) @@ -1739,7 +1781,7 @@ def add_data( """ self.props[path] = {} - # self.props[path]['hash'] = dvc_get_hash(path) + self.props[path]['hash'] = dvc_get_hash(path) parent_path = path.rsplit("/", 1)[0] self.data_parent = parent_path.rsplit("/", 1)[1] if custom_properties: @@ -1765,59 +1807,89 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: custom_properties: Dictionary to store key value pairs associated with Dataslice Example{"mean":2.5, "median":2.6} """ + # need to understand whether doing this before or after the change_dir will make a difference + # Assigning current file name as stage and execution name + current_script = sys.argv[0] + file_name = os.path.basename(current_script) + name_without_extension = os.path.splitext(file_name)[0] + # create context if not already created + if not self.writer.child_context: + self.writer.create_context(pipeline_stage=name_without_extension) + assert self.writer.child_context is not None, f"Failed to create context for {self.pipeline_name}!!" + + # create execution if not already created + if not self.writer.execution: + self.writer.create_execution(execution_type=name_without_extension) + assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" + + logging_dir = change_dir(self.writer.cmf_init_path) + directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value) + self.writer.dataslice_path = directory_path + os.makedirs(directory_path, exist_ok=True) custom_props = {} if custom_properties is None else custom_properties git_repo = git_get_repo() dataslice_df = pd.DataFrame.from_dict(self.props, orient="index") dataslice_df.index.names = ["Path"] - dataslice_df.to_parquet(self.name) + dataslice_path = os.path.join(directory_path,self.name) + dataslice_df.to_parquet(dataslice_path) existing_artifact = [] - commit_output(self.name, self.writer.execution.id) - c_hash = dvc_get_hash(self.name) + commit_output(dataslice_path, self.writer.execution.id) + c_hash = dvc_get_hash(dataslice_path) if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return dataslice_commit = c_hash - remote = dvc_get_url(self.name) + url = dvc_get_url(dataslice_path) + dvc_url_with_pipeline = f"{self.writer.parent_context.name}:{url}" if c_hash and c_hash.strip(): existing_artifact.extend( self.writer.store.get_artifacts_by_uri(c_hash)) if existing_artifact and len(existing_artifact) != 0: print("Adding to existing data slice") + # Haven't added event type in this if cond, is it not needed?? slice = link_execution_to_input_artifact( store=self.writer.store, execution_id=self.writer.execution.id, uri=c_hash, - input_name=self.name + ":" + c_hash, + input_name=dataslice_path + ":" + c_hash, ) else: - props = { - "Commit": dataslice_commit, # passing c_hash value to commit - "git_repo": git_repo, - "Remote": remote, - } - props.update(custom_props) + props={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(dataslice_commit), + "url": str(dvc_url_with_pipeline), + }, slice = create_new_artifact_event_and_attribution( store=self.writer.store, execution_id=self.writer.execution.id, context_id=self.writer.child_context.id, uri=c_hash, - name=self.name + ":" + c_hash, + name=dataslice_path + ":" + c_hash, type_name="Dataslice", event_type=mlpb.Event.Type.OUTPUT, - custom_properties=props, + properties=props, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) if self.writer.graph: self.writer.driver.create_dataslice_node( - self.name, self.name + ":" + c_hash, c_hash, self.data_parent, props + self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props ) + os.chdir(logging_dir) return slice - def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None) -> None: + # commit existing dataslice to server + def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None: custom_props = {} if custom_properties is None else custom_properties - c_hash = uri + c_hash = uri.strip() dataslice_commit = c_hash existing_artifact = [] if c_hash and c_hash.strip(): @@ -1825,11 +1897,12 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None self.writer.store.get_artifacts_by_uri(c_hash)) if existing_artifact and len(existing_artifact) != 0: print("Adding to existing data slice") + # Haven't added event type in this if cond, is it not needed?? slice = link_execution_to_input_artifact( store=self.writer.store, execution_id=self.writer.execution.id, uri=c_hash, - input_name=self.name + input_name=self.name, ) else: slice = create_new_artifact_event_and_attribution( @@ -1840,6 +1913,16 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None name=self.name, type_name="Dataslice", event_type=mlpb.Event.Type.OUTPUT, + properties={ + "git_repo": props.get("git_repo", ""), + "Commit": props.get("Commit", ""), + "url": props.get("url", " "), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_properties, milliseconds_since_epoch=int(time.time() * 1000), ) diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index c8519673..d2be110b 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -108,9 +108,9 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id): cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) elif artifact_type == "Dataslice": dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) - dataslice.commit_existing(uri, custom_props) + dataslice.commit_existing(uri, props, custom_props) elif artifact_type == "Step_Metrics": - cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props) + cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props) else: pass diff --git a/examples/example-get-started/src/train.py b/examples/example-get-started/src/train.py index 1f75cf65..eb456e14 100644 --- a/examples/example-get-started/src/train.py +++ b/examples/example-get-started/src/train.py @@ -65,6 +65,9 @@ def train(input_dir: str, output_dir: str) -> None: with open(model_file, "wb") as fd: pickle.dump(clf, fd) + _ = metawriter.log_metric("training_metrics", {"train_loss": 10}) + _ = metawriter.commit_metrics("training_metrics") + _ = metawriter.log_model( path=model_file, event="output", model_framework="SKlearn", model_type="RandomForestClassifier", model_name="RandomForestClassifier:default" diff --git a/examples/example-get-started/test-data-slice.py b/examples/example-get-started/test-data-slice.py index e442ff22..dbb5f224 100644 --- a/examples/example-get-started/test-data-slice.py +++ b/examples/example-get-started/test-data-slice.py @@ -46,7 +46,7 @@ def generate_dataset(): # Note - metadata is stored in a file called "mlmd". It is a sqlite file. # To delete earlier metadata, delete this mlmd file. -metawriter = cmf.Cmf(filename="mlmd", pipeline_name="dvc") +metawriter = cmf.Cmf(filepath="mlmd", pipeline_name="Test-env") _ = metawriter.create_context(pipeline_stage="Prepare") _ = metawriter.create_execution(execution_type="Prepare") @@ -69,6 +69,7 @@ def generate_dataset(): # Reading the files in the slice. df: pd.DataFrame = metawriter.read_dataslice(name="slice-1") +print("printing dataslice_path = ", metawriter.dataslice_path) record = "" row_content = None for label, content in df.iterrows(): @@ -87,11 +88,3 @@ def generate_dataset(): for label, content in df.iterrows(): if label == record: print(content) -# cleanup -folder_path = os.path.join(os.getcwd(), folder_path) -if os.path.exists(folder_path): - rmtree(folder_path) - -dvc_file = folder_path + ".dvc" -if os.path.exists(dvc_file): - os.remove(dvc_file) From 372c27f53e5682c556b16eb616f3c20dc37e2de3 Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Wed, 24 Jul 2024 05:41:21 -0700 Subject: [PATCH 2/8] resolved folder pull issue for local artifacts --- cmflib/cmf.py | 7 +++- cmflib/commands/artifact/push.py | 4 +-- cmflib/storage_backends/local_artifacts.py | 37 ++++++++++++++++++++-- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 1f7575c3..7972c04c 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -1870,7 +1870,12 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: name=dataslice_path + ":" + c_hash, type_name="Dataslice", event_type=mlpb.Event.Type.OUTPUT, - properties=props, + properties={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(dataslice_commit), + "url": str(dvc_url_with_pipeline), + }, artifact_type_properties={ "git_repo": mlpb.STRING, "Commit": mlpb.STRING, diff --git a/cmflib/commands/artifact/push.py b/cmflib/commands/artifact/push.py index 09b3ace3..b9f115b4 100644 --- a/cmflib/commands/artifact/push.py +++ b/cmflib/commands/artifact/push.py @@ -39,7 +39,7 @@ def run(self): cmf_config=CmfConfig.read_config(cmf_config_file) out_msg = check_minio_server(dvc_config_op) if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS": - return out_msg + return "MinioS3 server failed to start!!!" if dvc_config_op["core.remote"] == "osdf": #print("key_id="+cmf_config["osdf-key_id"]) dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"]) @@ -100,7 +100,7 @@ def run(self): file_set = set(names) result = dvc_push(list(file_set)) return result - + def add_parser(subparsers, parent_parser): HELP = "Push artifacts to the user configured artifact repo." diff --git a/cmflib/storage_backends/local_artifacts.py b/cmflib/storage_backends/local_artifacts.py index 11200414..14b0457b 100644 --- a/cmflib/storage_backends/local_artifacts.py +++ b/cmflib/storage_backends/local_artifacts.py @@ -15,7 +15,6 @@ ### import os - from dvc.api import DVCFileSystem class LocalArtifacts: @@ -32,11 +31,43 @@ def download_artifacts( dvc_config_op["remote.local-storage.url"] ) # dvc_config_op[1] is file system path - "/path/to/local/repository" # get_file() only creates file, to put artifacts in proper directory, subfolders are required. + # download_loc = contains absolute path of the file with file name and extension temp = download_loc.split("/") temp.pop() dir_path = "/".join(temp) - os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders - obj = fs.get_file(current_loc, download_loc) + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders + if current_loc.endswith('.dir'): + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + # download the .dir file + temp_dir = f"{download_loc}/dir" + fs.get_file(current_loc, temp_dir) + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + """ + current_loc = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the current_loc + which will leave us with the artifact repo path + """ + repo_path = current_loc.split("/") + repo_path = repo_path[:len(repo_path)-2] + repo_path = "/".join(repo_path) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_current_loc = f"{repo_path}/{formatted_md5}" + obj = fs.get_file(temp_current_loc, temp_download_loc) + if obj == None: + print(f"object {temp_current_loc} downloaded at {temp_download_loc}.") + if os.path.exists(temp_dir): + os.remove(temp_dir) + else: + obj = fs.get_file(current_loc, download_loc) if obj == None: # get_file() returns none when file gets downloaded. stmt = f"object {current_loc} downloaded at {download_loc}." return stmt From b9083fff1c98b5783cb29b6599a67748b61fbbae Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Wed, 24 Jul 2024 07:20:05 -0700 Subject: [PATCH 3/8] added Dataslice related changes required for minioS3 artifact repo --- cmflib/storage_backends/minio_artifacts.py | 28 ++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index aadbf880..4b115df1 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -39,6 +39,34 @@ def download_artifacts( if not found: return "Bucket doesn't exists" obj = client.fget_object(bucket_name, object_name, file_path) + if object_name.endswith('.dir'): + with open(file_path, 'r') as file: + tracked_files = eval(file.read()) + # removing raw_data file as we need raw_data as folder + if os.path.exists(file_path): + os.remove(file_path) + """ + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = object_name.split("/") + repo_path = repo_path[:len(repo_path)-2] + repo_path = "/".join(repo_path) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # file_path = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_file_path = f"{file_path}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + #obj = fs.get_file(temp_current_loc, temp_download_loc) + obj = client.fget_object(bucket_name, temp_object_name, temp_file_path) + if obj: + print(f"object {temp_object_name} downloaded at {temp_file_path}.") if obj: stmt = f"object {object_name} downloaded at {file_path}." return stmt From 4025a62e6344eb9c49a22945bc09d28719f1cac6 Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Thu, 25 Jul 2024 00:45:51 -0700 Subject: [PATCH 4/8] making changes to fix artifact pull form amazonS3, local and minio repo --- cmflib/bin/cmf | 4 ++ cmflib/storage_backends/amazonS3_artifacts.py | 46 ++++++++++++++++- cmflib/storage_backends/local_artifacts.py | 50 +++++++++++-------- cmflib/storage_backends/minio_artifacts.py | 45 ++++++++++++----- 4 files changed, 111 insertions(+), 34 deletions(-) diff --git a/cmflib/bin/cmf b/cmflib/bin/cmf index a36eb416..04881b8e 100755 --- a/cmflib/bin/cmf +++ b/cmflib/bin/cmf @@ -4,6 +4,10 @@ import re import sys from cmflib.cli import main +# this is temporary - need to remove after TripleDES warning goes away from paramiko +import warnings +warnings.filterwarnings(action='ignore', module='.*paramiko.*') + if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) sys.exit(main()) diff --git a/cmflib/storage_backends/amazonS3_artifacts.py b/cmflib/storage_backends/amazonS3_artifacts.py index e252fb04..2c10fb36 100644 --- a/cmflib/storage_backends/amazonS3_artifacts.py +++ b/cmflib/storage_backends/amazonS3_artifacts.py @@ -37,12 +37,56 @@ def download_artifacts( aws_session_token=session_token ) s3.head_bucket(Bucket=bucket_name) + dir_path = "" if "/" in download_loc: dir_path, _ = download_loc.rsplit("/", 1) if dir_path != "": os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed - response = s3.download_file(bucket_name, object_name, download_loc) + + response = "" + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{download_loc}/temp_dir" + response = s3.download_file(bucket_name, object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # file_path = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + obj = s3.download_file(bucket_name, temp_object_name, temp_download_loc) + if obj == None: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + # download objects which are file + response = s3.download_file(bucket_name, object_name, download_loc) if response == None: return f"{object_name} downloaded at {download_loc}" return response diff --git a/cmflib/storage_backends/local_artifacts.py b/cmflib/storage_backends/local_artifacts.py index 14b0457b..e85ef696 100644 --- a/cmflib/storage_backends/local_artifacts.py +++ b/cmflib/storage_backends/local_artifacts.py @@ -22,7 +22,7 @@ def download_artifacts( self, dvc_config_op, current_directory: str, - current_loc: str, + object_name: str, download_loc: str, ): obj = True @@ -30,46 +30,54 @@ def download_artifacts( fs = DVCFileSystem( dvc_config_op["remote.local-storage.url"] ) # dvc_config_op[1] is file system path - "/path/to/local/repository" + # get_file() only creates file, to put artifacts in proper directory, subfolders are required. # download_loc = contains absolute path of the file with file name and extension - temp = download_loc.split("/") - temp.pop() - dir_path = "/".join(temp) - os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders - if current_loc.endswith('.dir'): + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + + response = "" + + if object_name.endswith('.dir'): # in case of .dir, download_loc is a absolute path for a folder os.makedirs(download_loc, mode=0o777, exist_ok=True) - # download the .dir file + + # download the .dir object temp_dir = f"{download_loc}/dir" - fs.get_file(current_loc, temp_dir) + response = fs.get_file(object_name, temp_dir) + with open(temp_dir, 'r') as file: tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + """ - current_loc = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" + object_name = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" contains the path of the .dir on the artifact repo - we need to remove the hash of the .dir from the current_loc + we need to remove the hash of the .dir from the object_name which will leave us with the artifact repo path """ - repo_path = current_loc.split("/") - repo_path = repo_path[:len(repo_path)-2] - repo_path = "/".join(repo_path) + repo_path = "/".join(object_name.split("/")[:-2]) for file_info in tracked_files: relpath = file_info['relpath'] md5_val = file_info['md5'] # md5_val = a237457aa730c396e5acdbc5a64c8453 # we need a2/37457aa730c396e5acdbc5a64c8453 formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_object_name = f"{repo_path}/{formatted_md5}" temp_download_loc = f"{download_loc}/{relpath}" - temp_current_loc = f"{repo_path}/{formatted_md5}" - obj = fs.get_file(temp_current_loc, temp_download_loc) + obj = fs.get_file(temp_object_name, temp_download_loc) if obj == None: - print(f"object {temp_current_loc} downloaded at {temp_download_loc}.") - if os.path.exists(temp_dir): - os.remove(temp_dir) + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") else: - obj = fs.get_file(current_loc, download_loc) - if obj == None: # get_file() returns none when file gets downloaded. - stmt = f"object {current_loc} downloaded at {download_loc}." + response = fs.get_file(object_name, download_loc) + if response == None: # get_file() returns none when file gets downloaded. + stmt = f"object {object_name} downloaded at {download_loc}." return stmt except TypeError as exception: return exception diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index 4b115df1..c3c44fa6 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -26,7 +26,7 @@ def download_artifacts( current_directory: str, bucket_name: str, object_name: str, - file_path: str, + download_loc: str, ): endpoint = dvc_config_op["remote.minio.endpointurl"].split("http://")[1] access_key = dvc_config_op["remote.minio.access_key_id"] @@ -38,13 +38,30 @@ def download_artifacts( found = client.bucket_exists(bucket_name) if not found: return "Bucket doesn't exists" - obj = client.fget_object(bucket_name, object_name, file_path) + + response = "" + + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ if object_name.endswith('.dir'): - with open(file_path, 'r') as file: + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{download_loc}/temp_dir" + response = client.download_file(bucket_name, object_name, temp_dir) + + with open(download_loc, 'r') as file: tracked_files = eval(file.read()) - # removing raw_data file as we need raw_data as folder - if os.path.exists(file_path): - os.remove(file_path) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + """ object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir contains the path of the .dir on the artifact repo @@ -57,18 +74,22 @@ def download_artifacts( for file_info in tracked_files: relpath = file_info['relpath'] md5_val = file_info['md5'] - # file_path = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data + # download_loc = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data # md5_val = a237457aa730c396e5acdbc5a64c8453 # we need a2/37457aa730c396e5acdbc5a64c8453 formatted_md5 = md5_val[:2] + '/' + md5_val[2:] - temp_file_path = f"{file_path}/{relpath}" + temp_download_loc = f"{download_loc}/{relpath}" temp_object_name = f"{repo_path}/{formatted_md5}" #obj = fs.get_file(temp_current_loc, temp_download_loc) - obj = client.fget_object(bucket_name, temp_object_name, temp_file_path) + obj = client.fget_object(bucket_name, temp_object_name, temp_download_loc) if obj: - print(f"object {temp_object_name} downloaded at {temp_file_path}.") - if obj: - stmt = f"object {object_name} downloaded at {file_path}." + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + print(f"object {temp_object_name} is not downloaded.") + else: + response = client.fget_object(bucket_name, object_name, download_loc) + if response: + stmt = f"object {object_name} downloaded at {download_loc}." return stmt else: return f"object {object_name} is not downloaded." From f2ec9127bebb8b623abde1e57417abb72115995d Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Thu, 25 Jul 2024 02:38:36 -0700 Subject: [PATCH 5/8] made changes related to artifact pull for different artifact repos --- cmflib/cmf.py | 13 +-- cmflib/storage_backends/amazonS3_artifacts.py | 3 +- cmflib/storage_backends/local_artifacts.py | 5 ++ cmflib/storage_backends/minio_artifacts.py | 1 - .../storage_backends/sshremote_artifacts.py | 79 +++++++++++++++---- 5 files changed, 78 insertions(+), 23 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 7972c04c..e03b680a 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -1478,8 +1478,10 @@ def commit_metrics(self, metrics_name: str): Returns: Artifact object from the ML Protocol Buffers library associated with the new metrics artifact. """ - # code for nano cmf is remaining - # Assigning current file name as stage and execution name + + logging_dir = change_dir(self.cmf_init_path) + # code for nano cmf + # Assigning current file name as stage and execution name current_script = sys.argv[0] file_name = os.path.basename(current_script) name_without_extension = os.path.splitext(file_name)[0] @@ -1493,7 +1495,7 @@ def commit_metrics(self, metrics_name: str): self.create_execution(execution_type=name_without_extension) assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" - logging_dir = change_dir(self.cmf_init_path) + directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value) os.makedirs(directory_path, exist_ok=True) metrics_df = pd.DataFrame.from_dict( @@ -1807,7 +1809,9 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: custom_properties: Dictionary to store key value pairs associated with Dataslice Example{"mean":2.5, "median":2.6} """ - # need to understand whether doing this before or after the change_dir will make a difference + + logging_dir = change_dir(self.writer.cmf_init_path) + # code for nano cmf # Assigning current file name as stage and execution name current_script = sys.argv[0] file_name = os.path.basename(current_script) @@ -1822,7 +1826,6 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: self.writer.create_execution(execution_type=name_without_extension) assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" - logging_dir = change_dir(self.writer.cmf_init_path) directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value) self.writer.dataslice_path = directory_path os.makedirs(directory_path, exist_ok=True) diff --git a/cmflib/storage_backends/amazonS3_artifacts.py b/cmflib/storage_backends/amazonS3_artifacts.py index 2c10fb36..194641ca 100644 --- a/cmflib/storage_backends/amazonS3_artifacts.py +++ b/cmflib/storage_backends/amazonS3_artifacts.py @@ -45,6 +45,7 @@ def download_artifacts( os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed response = "" + """" if object_name ends with .dir - it is a directory. we download .dir object with 'temp_dir' and remove @@ -75,7 +76,7 @@ def download_artifacts( for file_info in tracked_files: relpath = file_info['relpath'] md5_val = file_info['md5'] - # file_path = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data # md5_val = a237457aa730c396e5acdbc5a64c8453 # we need a2/37457aa730c396e5acdbc5a64c8453 formatted_md5 = md5_val[:2] + '/' + md5_val[2:] diff --git a/cmflib/storage_backends/local_artifacts.py b/cmflib/storage_backends/local_artifacts.py index e85ef696..8626c4cb 100644 --- a/cmflib/storage_backends/local_artifacts.py +++ b/cmflib/storage_backends/local_artifacts.py @@ -41,6 +41,11 @@ def download_artifacts( response = "" + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ if object_name.endswith('.dir'): # in case of .dir, download_loc is a absolute path for a folder os.makedirs(download_loc, mode=0o777, exist_ok=True) diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index c3c44fa6..80a62cf2 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -80,7 +80,6 @@ def download_artifacts( formatted_md5 = md5_val[:2] + '/' + md5_val[2:] temp_download_loc = f"{download_loc}/{relpath}" temp_object_name = f"{repo_path}/{formatted_md5}" - #obj = fs.get_file(temp_current_loc, temp_download_loc) obj = client.fget_object(bucket_name, temp_object_name, temp_download_loc) if obj: print(f"object {temp_object_name} downloaded at {temp_download_loc}.") diff --git a/cmflib/storage_backends/sshremote_artifacts.py b/cmflib/storage_backends/sshremote_artifacts.py index 5d407bbf..55ee18c2 100644 --- a/cmflib/storage_backends/sshremote_artifacts.py +++ b/cmflib/storage_backends/sshremote_artifacts.py @@ -17,6 +17,9 @@ import os import paramiko +# this is temporary - need to remove after TripleDES warning goes away from paramiko +import warnings +warnings.filterwarnings(action='ignore', module='.*paramiko.*') class SSHremoteArtifacts: def download_artifacts( @@ -24,10 +27,9 @@ def download_artifacts( dvc_config_op, host: str, current_directory: str, - remote_file_path: str, - local_path: str, + object_name: str, + download_loc: str, ): - output = "" remote_repo = dvc_config_op["remote.ssh-storage.url"] user = dvc_config_op["remote.ssh-storage.user"] password = dvc_config_op["remote.ssh-storage.password"] @@ -38,21 +40,66 @@ def download_artifacts( ) # this can lead to man in the middle attack, need to find another solution ssh.connect(host, username=user, password=password) sftp = ssh.open_sftp() - temp = local_path.split("/") - temp.pop() - dir_path = "/".join(temp) - dir_to_create = os.path.join(current_directory, dir_path) - os.makedirs( - dir_to_create, mode=0o777, exist_ok=True - ) # creates subfolders needed as per artifacts folder structure - local_file_path = os.path.join(current_directory, local_path) - local_file_path = os.path.abspath(local_file_path) - output = sftp.put(remote_file_path, local_file_path) + + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + # creates subfolders needed as per artifacts folder structure + os.makedirs(dir_path, mode=0o777, exist_ok=True) + + response = "" + + abs_download_loc = os.path.abspath(os.path.join(current_directory, download_loc)) + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, abs_download_loc is a absolute path for a folder + os.makedirs(abs_download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{abs_download_loc}/temp_dir" + response = sftp.put(object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = /home/user/ssh-storage/files/md5/dd/2d792b7cf6efb02231f85c6147e403.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{abs_download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + obj = sftp.put(object_name, temp_download_loc) + if obj: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + response = sftp.put(object_name, abs_download_loc) + + if response: + stmt = f"object {object_name} downloaded at {abs_download_loc}." + return stmt + sftp.close() ssh.close() - if output: - stmt = f"object {remote_file_path} downloaded at {local_file_path}." - return stmt except TypeError as exception: return exception From 733f437ed019836dd05338c4b2602f012a54fce2 Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Thu, 25 Jul 2024 03:44:41 -0700 Subject: [PATCH 6/8] addressing review comments --- cmflib/cmf.py | 6 ++---- examples/example-get-started/test-data-slice.py | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index e03b680a..a7815c64 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -132,7 +132,6 @@ def __init__( config.sqlite.filename_uri = filepath self.store = metadata_store.MetadataStore(config) self.filepath = filepath - self.dataslice_path = None self.child_context = None self.execution = None self.execution_name = "" @@ -1722,7 +1721,7 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice": def read_dataslice(self, name: str) -> pd.DataFrame: """Reads the dataslice""" # To do checkout if not there - name = os.path.join(self.dataslice_path, name) + name = name df = pd.read_parquet(name) return df @@ -1743,7 +1742,7 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): Returns: None """ - name = os.path.join(self.dataslice_path, name) + name = name df = pd.read_parquet(name) temp_dict = df.to_dict("index") temp_dict[record].update(custom_properties) @@ -1827,7 +1826,6 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value) - self.writer.dataslice_path = directory_path os.makedirs(directory_path, exist_ok=True) custom_props = {} if custom_properties is None else custom_properties git_repo = git_get_repo() diff --git a/examples/example-get-started/test-data-slice.py b/examples/example-get-started/test-data-slice.py index dbb5f224..a25d1c95 100644 --- a/examples/example-get-started/test-data-slice.py +++ b/examples/example-get-started/test-data-slice.py @@ -69,7 +69,6 @@ def generate_dataset(): # Reading the files in the slice. df: pd.DataFrame = metawriter.read_dataslice(name="slice-1") -print("printing dataslice_path = ", metawriter.dataslice_path) record = "" row_content = None for label, content in df.iterrows(): From bd23e4c66ca641b5710b1a87a34a6c5bd7ecc5fc Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Thu, 25 Jul 2024 03:56:03 -0700 Subject: [PATCH 7/8] addressing review comments --- cmflib/cmf.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index a7815c64..7c1d2f38 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -1721,7 +1721,8 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice": def read_dataslice(self, name: str) -> pd.DataFrame: """Reads the dataslice""" # To do checkout if not there - name = name + directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value) + name = os.path.join(directory_path, name) df = pd.read_parquet(name) return df @@ -1742,7 +1743,8 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): Returns: None """ - name = name + directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value) + name = os.path.join(directory_path, name) df = pd.read_parquet(name) temp_dict = df.to_dict("index") temp_dict[record].update(custom_properties) From 6b287746cecb7b6822f8538922a1ab19cbde92bb Mon Sep 17 00:00:00 2001 From: Varkha Sharma Date: Thu, 25 Jul 2024 04:05:54 -0700 Subject: [PATCH 8/8] resolving some testing errors --- cmflib/storage_backends/minio_artifacts.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index 80a62cf2..fa2ca7ad 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -41,7 +41,6 @@ def download_artifacts( response = "" - """" if object_name ends with .dir - it is a directory. we download .dir object with 'temp_dir' and remove @@ -53,9 +52,9 @@ def download_artifacts( # download .dir object temp_dir = f"{download_loc}/temp_dir" - response = client.download_file(bucket_name, object_name, temp_dir) + response = client.fget_object(bucket_name, object_name, temp_dir) - with open(download_loc, 'r') as file: + with open(temp_dir, 'r') as file: tracked_files = eval(file.read()) # removing temp_dir