From 0269f34c98616b808e5ae559a0d287de824819fc Mon Sep 17 00:00:00 2001 From: ANNMARY JUSTINE Date: Wed, 28 Aug 2024 19:22:23 -0600 Subject: [PATCH] Path branch (#200) * Added path variables as class properties and put artifacts inside the respective executions * segregate by execution id * replace / with comma * removed duplicate word --- cmflib/cmf.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index d33d4e1d..9690c230 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -101,6 +101,9 @@ class Cmf: # pylint: disable=too-many-instance-attributes # Reading CONFIG_FILE variable cmf_config = os.environ.get("CONFIG_FILE", ".cmfconfig") + ARTIFACTS_PATH = "cmf_artifacts" + DATASLICE_PATH = "dataslice" + METRICS_PATH = "metrics" if os.path.exists(cmf_config): attr_dict = CmfConfig.read_config(cmf_config) __neo4j_uri = attr_dict.get("neo4j-uri", "") @@ -1495,7 +1498,7 @@ def commit_metrics(self, metrics_name: str): assert self.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" - directory_path = os.path.join( "cmf_artifacts/metrics",self.execution.properties["Execution_uuid"].string_value) + directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], METRICS_PATH) os.makedirs(directory_path, exist_ok=True) metrics_df = pd.DataFrame.from_dict( self.metrics[metrics_name], orient="index") @@ -1721,7 +1724,7 @@ def create_dataslice(self, name: str) -> "Cmf.DataSlice": def read_dataslice(self, name: str) -> pd.DataFrame: """Reads the dataslice""" # To do checkout if not there - directory_path = os.path.join("cmf_artifacts/dataslices",self.execution.properties["Execution_uuid"].string_value) + directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], DATASLICE_PATH) name = os.path.join(directory_path, name) df = pd.read_parquet(name) return df @@ -1743,7 +1746,7 @@ def update_dataslice(self, name: str, record: str, custom_properties: t.Dict): Returns: None """ - directory_path = os.path.join("cmf_artifacts/dataslices", self.execution.properties["Execution_uuid"].string_value) + directory_path = os.path.join(ARTIFACTS_PATH, self.execution.properties["Execution_uuid"].string_value.split(',')[0], DATASLICE_PATH) name = os.path.join(directory_path, name) df = pd.read_parquet(name) temp_dict = df.to_dict("index") @@ -1827,7 +1830,7 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None: self.writer.create_execution(execution_type=name_without_extension) assert self.writer.execution is not None, f"Failed to create execution for {self.pipeline_name}!!" - directory_path = os.path.join( "cmf_artifacts/dataslices",self.writer.execution.properties["Execution_uuid"].string_value) + directory_path = os.path.join(self.writer.ARTIFACTS_PATH, self.writer.execution.properties["Execution_uuid"].string_value.split(',')[0], self.writer.DATASLICE_PATH) os.makedirs(directory_path, exist_ok=True) custom_props = {} if custom_properties is None else custom_properties git_repo = git_get_repo()