From d1457ca6820e61f71967690bb0ab95f2ca205df0 Mon Sep 17 00:00:00 2001 From: Varkha Sharma <112053040+varkha-d-sharma@users.noreply.github.com> Date: Thu, 25 Jul 2024 21:05:46 +0530 Subject: [PATCH] Dataslice and StepMetrics: artifact and metadata pull and push related changes (#189) * changes made for dataslice and metrics artifact and metadata pull/push * resolved folder pull issue for local artifacts * added Dataslice related changes required for minioS3 artifact repo * making changes to fix artifact pull form amazonS3, local and minio repo * made changes related to artifact pull for different artifact repos * addressing review comments * addressing review comments * resolving some testing errors --- cmflib/bin/cmf | 4 + cmflib/cmf.py | 175 +++++++++++++----- cmflib/cmf_merger.py | 4 +- cmflib/commands/artifact/push.py | 4 +- cmflib/storage_backends/amazonS3_artifacts.py | 47 ++++- cmflib/storage_backends/local_artifacts.py | 62 ++++++- cmflib/storage_backends/minio_artifacts.py | 55 +++++- .../storage_backends/sshremote_artifacts.py | 79 ++++++-- examples/example-get-started/src/train.py | 3 + .../example-get-started/test-data-slice.py | 10 +- 10 files changed, 358 insertions(+), 85 deletions(-) diff --git a/cmflib/bin/cmf b/cmflib/bin/cmf index a36eb416..04881b8e 100755 --- a/cmflib/bin/cmf +++ b/cmflib/bin/cmf @@ -4,6 +4,10 @@ import re import sys from cmflib.cli import main +# this is temporary - need to remove after TripleDES warning goes away from paramiko +import warnings +warnings.filterwarnings(action='ignore', module='.*paramiko.*') + if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) sys.exit(main()) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 4b795bb5..7c1d2f38 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -646,7 +646,6 @@ def log_dataset( Returns: Artifact object from ML Metadata library associated with the new dataset artifact. """ - # Assigning current file name as stage and execution name current_script = sys.argv[0] file_name = os.path.basename(current_script) @@ -679,7 +678,7 @@ def log_dataset( if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return dataset_commit = c_hash dvc_url = dvc_get_url(url) @@ -1033,7 +1032,7 @@ def log_model( if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return model_commit = c_hash @@ -1478,20 +1477,42 @@ def commit_metrics(self, metrics_name: str): Returns: Artifact object from the ML Protocol Buffers library associated with the new metrics artifact. """ + logging_dir = change_dir(self.cmf_init_path) + # code for nano cmf + # Assigning current file name as stage and execution name + current_script = sys.argv[0] + file_name = os.path.basename(current_script) + name_without_extension = os.path.splitext(file_name)[0] + # create context if not already created + if not self.child_context: + self.create_context(pipeline_stage=name_without_extension) + assert self.child_context is not None, f"Failed to create context for {self.pipeline_name}!!" + + # create execution if not already created + if not self.execution: + self.create_execution(execution_type=name_without_extension) + assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" + + + directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value) + os.makedirs(directory_path, exist_ok=True) metrics_df = pd.DataFrame.from_dict( self.metrics[metrics_name], orient="index") metrics_df.index.names = ["SequenceNumber"] - metrics_df.to_parquet(metrics_name) - commit_output(metrics_name, self.execution.id) - uri = dvc_get_hash(metrics_name) + metrics_path = os.path.join(directory_path,metrics_name) + metrics_df.to_parquet(metrics_path) + commit_output(metrics_path, self.execution.id) + uri = dvc_get_hash(metrics_path) if uri == "": print("Error in getting the dvc hash,return without logging") - return null + return metrics_commit = uri + dvc_url = dvc_get_url(metrics_path) + dvc_url_with_pipeline = f"{self.parent_context.name}:{dvc_url}" name = ( - metrics_name + metrics_path + ":" + uri + ":" @@ -1499,8 +1520,10 @@ def commit_metrics(self, metrics_name: str): + ":" + str(uuid.uuid1()) ) - # passing uri value to commit - custom_props = {"Name": metrics_name, "Commit": metrics_commit} + # not needed as property 'name' is part of artifact + # to maintain uniformity - Commit goes propeties of the artifact + # custom_props = {"Name": metrics_name, "Commit": metrics_commit} + custom_props = {} metrics = create_new_artifact_event_and_attribution( store=self.store, execution_id=self.execution.id, @@ -1509,6 +1532,15 @@ def commit_metrics(self, metrics_name: str): name=name, type_name="Step_Metrics", event_type=mlpb.Event.Type.OUTPUT, + properties={ + # passing uri value to commit + "Commit": metrics_commit, + "url": str(dvc_url_with_pipeline), + }, + artifact_type_properties={ + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) @@ -1537,20 +1569,20 @@ def commit_metrics(self, metrics_name: str): os.chdir(logging_dir) return metrics - def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties: t.Optional[t.Dict] = None): - """ - Commits existing metrics associated with the given URI to MLMD. - Example: - ```python - artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", - {"custom_key": "custom_value"}) - ``` - Args: - metrics_name: Name of the metrics. - uri: Unique identifier associated with the metrics. - custom_properties: Optional custom properties for the metrics. + def commit_existing_metrics(self, metrics_name: str, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None): + """ + Commits existing metrics associated with the given URI to MLMD. + Example: + ```python + artifact: mlpb.Artifact = cmf.commit_existing_metrics("existing_metrics", "abc123", + {"custom_key": "custom_value"}) + ``` + Args: + metrics_name: Name of the metrics. + uri: Unique identifier associated with the metrics. + custom_properties: Optional custom properties for the metrics. Returns: - Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. + Artifact object from the ML Protocol Buffers library associated with the existing metrics artifact. """ custom_props = {} if custom_properties is None else custom_properties @@ -1575,6 +1607,15 @@ def commit_existing_metrics(self, metrics_name: str, uri: str, custom_properties name=metrics_name, type_name="Step_Metrics", event_type=mlpb.Event.Type.OUTPUT, + properties={ + # passing uri value to commit + "Commit": props.get("Commit", ""), + "url": props.get("url", ""), + }, + artifact_type_properties={ + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) @@ -1680,6 +1721,8 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice": def read_dataslice(self, name: str) -> pd.DataFrame: """Reads the dataslice""" # To do checkout if not there + directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value) + name = os.path.join(directory_path, name) df = pd.read_parquet(name) return df @@ -1700,6 +1743,8 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): Returns: None """ + directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value) + name = os.path.join(directory_path, name) df = pd.read_parquet(name) temp_dict = df.to_dict("index") temp_dict[record].update(custom_properties) @@ -1739,7 +1784,7 @@ def add_data( """ self.props[path] = {} - # self.props[path]['hash'] = dvc_get_hash(path) + self.props[path]['hash'] = dvc_get_hash(path) parent_path = path.rsplit("/", 1)[0] self.data_parent = parent_path.rsplit("/", 1)[1] if custom_properties: @@ -1765,59 +1810,94 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: custom_properties: Dictionary to store key value pairs associated with Dataslice Example{"mean":2.5, "median":2.6} """ + + logging_dir = change_dir(self.writer.cmf_init_path) + # code for nano cmf + # Assigning current file name as stage and execution name + current_script = sys.argv[0] + file_name = os.path.basename(current_script) + name_without_extension = os.path.splitext(file_name)[0] + # create context if not already created + if not self.writer.child_context: + self.writer.create_context(pipeline_stage=name_without_extension) + assert self.writer.child_context is not None, f"Failed to create context for {self.pipeline_name}!!" + + # create execution if not already created + if not self.writer.execution: + self.writer.create_execution(execution_type=name_without_extension) + assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" + + directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value) + os.makedirs(directory_path, exist_ok=True) custom_props = {} if custom_properties is None else custom_properties git_repo = git_get_repo() dataslice_df = pd.DataFrame.from_dict(self.props, orient="index") dataslice_df.index.names = ["Path"] - dataslice_df.to_parquet(self.name) + dataslice_path = os.path.join(directory_path,self.name) + dataslice_df.to_parquet(dataslice_path) existing_artifact = [] - commit_output(self.name, self.writer.execution.id) - c_hash = dvc_get_hash(self.name) + commit_output(dataslice_path, self.writer.execution.id) + c_hash = dvc_get_hash(dataslice_path) if c_hash == "": print("Error in getting the dvc hash,return without logging") - return null + return dataslice_commit = c_hash - remote = dvc_get_url(self.name) + url = dvc_get_url(dataslice_path) + dvc_url_with_pipeline = f"{self.writer.parent_context.name}:{url}" if c_hash and c_hash.strip(): existing_artifact.extend( self.writer.store.get_artifacts_by_uri(c_hash)) if existing_artifact and len(existing_artifact) != 0: print("Adding to existing data slice") + # Haven't added event type in this if cond, is it not needed?? slice = link_execution_to_input_artifact( store=self.writer.store, execution_id=self.writer.execution.id, uri=c_hash, - input_name=self.name + ":" + c_hash, + input_name=dataslice_path + ":" + c_hash, ) else: - props = { - "Commit": dataslice_commit, # passing c_hash value to commit - "git_repo": git_repo, - "Remote": remote, - } - props.update(custom_props) + props={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(dataslice_commit), + "url": str(dvc_url_with_pipeline), + }, slice = create_new_artifact_event_and_attribution( store=self.writer.store, execution_id=self.writer.execution.id, context_id=self.writer.child_context.id, uri=c_hash, - name=self.name + ":" + c_hash, + name=dataslice_path + ":" + c_hash, type_name="Dataslice", event_type=mlpb.Event.Type.OUTPUT, - custom_properties=props, + properties={ + "git_repo": str(git_repo), + # passing c_hash value to commit + "Commit": str(dataslice_commit), + "url": str(dvc_url_with_pipeline), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, + custom_properties=custom_props, milliseconds_since_epoch=int(time.time() * 1000), ) if self.writer.graph: self.writer.driver.create_dataslice_node( - self.name, self.name + ":" + c_hash, c_hash, self.data_parent, props + self.name, dataslice_path + ":" + c_hash, c_hash, self.data_parent, props ) + os.chdir(logging_dir) return slice - def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None) -> None: + # commit existing dataslice to server + def commit_existing(self, uri: str, props: t.Optional[t.Dict] = None, custom_properties: t.Optional[t.Dict] = None) -> None: custom_props = {} if custom_properties is None else custom_properties - c_hash = uri + c_hash = uri.strip() dataslice_commit = c_hash existing_artifact = [] if c_hash and c_hash.strip(): @@ -1825,11 +1905,12 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None self.writer.store.get_artifacts_by_uri(c_hash)) if existing_artifact and len(existing_artifact) != 0: print("Adding to existing data slice") + # Haven't added event type in this if cond, is it not needed?? slice = link_execution_to_input_artifact( store=self.writer.store, execution_id=self.writer.execution.id, uri=c_hash, - input_name=self.name + input_name=self.name, ) else: slice = create_new_artifact_event_and_attribution( @@ -1840,6 +1921,16 @@ def commit_existing(self, uri: str, custom_properties: t.Optional[t.Dict] = None name=self.name, type_name="Dataslice", event_type=mlpb.Event.Type.OUTPUT, + properties={ + "git_repo": props.get("git_repo", ""), + "Commit": props.get("Commit", ""), + "url": props.get("url", " "), + }, + artifact_type_properties={ + "git_repo": mlpb.STRING, + "Commit": mlpb.STRING, + "url": mlpb.STRING, + }, custom_properties=custom_properties, milliseconds_since_epoch=int(time.time() * 1000), ) diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index c8519673..d2be110b 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -108,9 +108,9 @@ def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id): cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) elif artifact_type == "Dataslice": dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) - dataslice.commit_existing(uri, custom_props) + dataslice.commit_existing(uri, props, custom_props) elif artifact_type == "Step_Metrics": - cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props) + cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props) else: pass diff --git a/cmflib/commands/artifact/push.py b/cmflib/commands/artifact/push.py index 09b3ace3..b9f115b4 100644 --- a/cmflib/commands/artifact/push.py +++ b/cmflib/commands/artifact/push.py @@ -39,7 +39,7 @@ def run(self): cmf_config=CmfConfig.read_config(cmf_config_file) out_msg = check_minio_server(dvc_config_op) if dvc_config_op["core.remote"] == "minio" and out_msg != "SUCCESS": - return out_msg + return "MinioS3 server failed to start!!!" if dvc_config_op["core.remote"] == "osdf": #print("key_id="+cmf_config["osdf-key_id"]) dynamic_password = generate_osdf_token(cmf_config["osdf-key_id"],cmf_config["osdf-key_path"],cmf_config["osdf-key_issuer"]) @@ -100,7 +100,7 @@ def run(self): file_set = set(names) result = dvc_push(list(file_set)) return result - + def add_parser(subparsers, parent_parser): HELP = "Push artifacts to the user configured artifact repo." diff --git a/cmflib/storage_backends/amazonS3_artifacts.py b/cmflib/storage_backends/amazonS3_artifacts.py index e252fb04..194641ca 100644 --- a/cmflib/storage_backends/amazonS3_artifacts.py +++ b/cmflib/storage_backends/amazonS3_artifacts.py @@ -37,12 +37,57 @@ def download_artifacts( aws_session_token=session_token ) s3.head_bucket(Bucket=bucket_name) + dir_path = "" if "/" in download_loc: dir_path, _ = download_loc.rsplit("/", 1) if dir_path != "": os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed - response = s3.download_file(bucket_name, object_name, download_loc) + + response = "" + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{download_loc}/temp_dir" + response = s3.download_file(bucket_name, object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + obj = s3.download_file(bucket_name, temp_object_name, temp_download_loc) + if obj == None: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + # download objects which are file + response = s3.download_file(bucket_name, object_name, download_loc) if response == None: return f"{object_name} downloaded at {download_loc}" return response diff --git a/cmflib/storage_backends/local_artifacts.py b/cmflib/storage_backends/local_artifacts.py index 11200414..8626c4cb 100644 --- a/cmflib/storage_backends/local_artifacts.py +++ b/cmflib/storage_backends/local_artifacts.py @@ -15,7 +15,6 @@ ### import os - from dvc.api import DVCFileSystem class LocalArtifacts: @@ -23,7 +22,7 @@ def download_artifacts( self, dvc_config_op, current_directory: str, - current_loc: str, + object_name: str, download_loc: str, ): obj = True @@ -31,14 +30,59 @@ def download_artifacts( fs = DVCFileSystem( dvc_config_op["remote.local-storage.url"] ) # dvc_config_op[1] is file system path - "/path/to/local/repository" + # get_file() only creates file, to put artifacts in proper directory, subfolders are required. - temp = download_loc.split("/") - temp.pop() - dir_path = "/".join(temp) - os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders - obj = fs.get_file(current_loc, download_loc) - if obj == None: # get_file() returns none when file gets downloaded. - stmt = f"object {current_loc} downloaded at {download_loc}." + # download_loc = contains absolute path of the file with file name and extension + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + os.makedirs(dir_path, mode=0o777, exist_ok=True) # creating subfolders if needed + + response = "" + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + # download the .dir object + temp_dir = f"{download_loc}/dir" + response = fs.get_file(object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = "files/md5/9b/9a458ac0b534f088a47c2b68bae479.dir" + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_object_name = f"{repo_path}/{formatted_md5}" + temp_download_loc = f"{download_loc}/{relpath}" + obj = fs.get_file(temp_object_name, temp_download_loc) + if obj == None: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + response = fs.get_file(object_name, download_loc) + if response == None: # get_file() returns none when file gets downloaded. + stmt = f"object {object_name} downloaded at {download_loc}." return stmt except TypeError as exception: return exception diff --git a/cmflib/storage_backends/minio_artifacts.py b/cmflib/storage_backends/minio_artifacts.py index aadbf880..fa2ca7ad 100644 --- a/cmflib/storage_backends/minio_artifacts.py +++ b/cmflib/storage_backends/minio_artifacts.py @@ -26,7 +26,7 @@ def download_artifacts( current_directory: str, bucket_name: str, object_name: str, - file_path: str, + download_loc: str, ): endpoint = dvc_config_op["remote.minio.endpointurl"].split("http://")[1] access_key = dvc_config_op["remote.minio.access_key_id"] @@ -38,9 +38,56 @@ def download_artifacts( found = client.bucket_exists(bucket_name) if not found: return "Bucket doesn't exists" - obj = client.fget_object(bucket_name, object_name, file_path) - if obj: - stmt = f"object {object_name} downloaded at {file_path}." + + response = "" + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, download_loc is a absolute path for a folder + os.makedirs(download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{download_loc}/temp_dir" + response = client.fget_object(bucket_name, object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = files/md5/c9/d8fdacc0d942cf8d7d95b6301cfb97.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = object_name.split("/") + repo_path = repo_path[:len(repo_path)-2] + repo_path = "/".join(repo_path) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/sharvark/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + obj = client.fget_object(bucket_name, temp_object_name, temp_download_loc) + if obj: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + print(f"object {temp_object_name} is not downloaded.") + else: + response = client.fget_object(bucket_name, object_name, download_loc) + if response: + stmt = f"object {object_name} downloaded at {download_loc}." return stmt else: return f"object {object_name} is not downloaded." diff --git a/cmflib/storage_backends/sshremote_artifacts.py b/cmflib/storage_backends/sshremote_artifacts.py index 5d407bbf..55ee18c2 100644 --- a/cmflib/storage_backends/sshremote_artifacts.py +++ b/cmflib/storage_backends/sshremote_artifacts.py @@ -17,6 +17,9 @@ import os import paramiko +# this is temporary - need to remove after TripleDES warning goes away from paramiko +import warnings +warnings.filterwarnings(action='ignore', module='.*paramiko.*') class SSHremoteArtifacts: def download_artifacts( @@ -24,10 +27,9 @@ def download_artifacts( dvc_config_op, host: str, current_directory: str, - remote_file_path: str, - local_path: str, + object_name: str, + download_loc: str, ): - output = "" remote_repo = dvc_config_op["remote.ssh-storage.url"] user = dvc_config_op["remote.ssh-storage.user"] password = dvc_config_op["remote.ssh-storage.password"] @@ -38,21 +40,66 @@ def download_artifacts( ) # this can lead to man in the middle attack, need to find another solution ssh.connect(host, username=user, password=password) sftp = ssh.open_sftp() - temp = local_path.split("/") - temp.pop() - dir_path = "/".join(temp) - dir_to_create = os.path.join(current_directory, dir_path) - os.makedirs( - dir_to_create, mode=0o777, exist_ok=True - ) # creates subfolders needed as per artifacts folder structure - local_file_path = os.path.join(current_directory, local_path) - local_file_path = os.path.abspath(local_file_path) - output = sftp.put(remote_file_path, local_file_path) + + dir_path = "" + if "/" in download_loc: + dir_path, _ = download_loc.rsplit("/", 1) + if dir_path != "": + # creates subfolders needed as per artifacts folder structure + os.makedirs(dir_path, mode=0o777, exist_ok=True) + + response = "" + + abs_download_loc = os.path.abspath(os.path.join(current_directory, download_loc)) + + """" + if object_name ends with .dir - it is a directory. + we download .dir object with 'temp_dir' and remove + this after all the files from this .dir object is downloaded. + """ + if object_name.endswith('.dir'): + # in case of .dir, abs_download_loc is a absolute path for a folder + os.makedirs(abs_download_loc, mode=0o777, exist_ok=True) + + # download .dir object + temp_dir = f"{abs_download_loc}/temp_dir" + response = sftp.put(object_name, temp_dir) + + with open(temp_dir, 'r') as file: + tracked_files = eval(file.read()) + + # removing temp_dir + if os.path.exists(temp_dir): + os.remove(temp_dir) + + """ + object_name = /home/user/ssh-storage/files/md5/dd/2d792b7cf6efb02231f85c6147e403.dir + contains the path of the .dir on the artifact repo + we need to remove the hash of the .dir from the object_name + which will leave us with the artifact repo path + """ + repo_path = "/".join(object_name.split("/")[:-2]) + for file_info in tracked_files: + relpath = file_info['relpath'] + md5_val = file_info['md5'] + # download_loc = /home/user/datatslice/example-get-started/test/artifacts/raw_data + # md5_val = a237457aa730c396e5acdbc5a64c8453 + # we need a2/37457aa730c396e5acdbc5a64c8453 + formatted_md5 = md5_val[:2] + '/' + md5_val[2:] + temp_download_loc = f"{abs_download_loc}/{relpath}" + temp_object_name = f"{repo_path}/{formatted_md5}" + obj = sftp.put(object_name, temp_download_loc) + if obj: + print(f"object {temp_object_name} downloaded at {temp_download_loc}.") + else: + response = sftp.put(object_name, abs_download_loc) + + if response: + stmt = f"object {object_name} downloaded at {abs_download_loc}." + return stmt + sftp.close() ssh.close() - if output: - stmt = f"object {remote_file_path} downloaded at {local_file_path}." - return stmt except TypeError as exception: return exception diff --git a/examples/example-get-started/src/train.py b/examples/example-get-started/src/train.py index 1f75cf65..eb456e14 100644 --- a/examples/example-get-started/src/train.py +++ b/examples/example-get-started/src/train.py @@ -65,6 +65,9 @@ def train(input_dir: str, output_dir: str) -> None: with open(model_file, "wb") as fd: pickle.dump(clf, fd) + _ = metawriter.log_metric("training_metrics", {"train_loss": 10}) + _ = metawriter.commit_metrics("training_metrics") + _ = metawriter.log_model( path=model_file, event="output", model_framework="SKlearn", model_type="RandomForestClassifier", model_name="RandomForestClassifier:default" diff --git a/examples/example-get-started/test-data-slice.py b/examples/example-get-started/test-data-slice.py index e442ff22..a25d1c95 100644 --- a/examples/example-get-started/test-data-slice.py +++ b/examples/example-get-started/test-data-slice.py @@ -46,7 +46,7 @@ def generate_dataset(): # Note - metadata is stored in a file called "mlmd". It is a sqlite file. # To delete earlier metadata, delete this mlmd file. -metawriter = cmf.Cmf(filename="mlmd", pipeline_name="dvc") +metawriter = cmf.Cmf(filepath="mlmd", pipeline_name="Test-env") _ = metawriter.create_context(pipeline_stage="Prepare") _ = metawriter.create_execution(execution_type="Prepare") @@ -87,11 +87,3 @@ def generate_dataset(): for label, content in df.iterrows(): if label == record: print(content) -# cleanup -folder_path = os.path.join(os.getcwd(), folder_path) -if os.path.exists(folder_path): - rmtree(folder_path) - -dvc_file = folder_path + ".dvc" -if os.path.exists(dvc_file): - os.remove(dvc_file)