From 3b0881348da75b66ee9e6c76ea73c4c487e96611 Mon Sep 17 00:00:00 2001 From: abhinavchobey <111754147+abhinavchobey@users.noreply.github.com> Date: Tue, 8 Oct 2024 05:47:12 +0530 Subject: [PATCH] Sync api to async api (#202) * adding async poc changes * adding changes with run in threadpool * added async wrapper for rest of the api's * adding update context and exec functions * adding changes * update * adding async poc changes * adding changes with run in threadpool * added async wrapper for rest of the api's * adding update context and exec functions * adding changes * update * update * adding lock to create_unique_exec * added comments removed unused libraries and imports * adding async poc changes * adding changes with run in threadpool * added async wrapper for rest of the api's * adding update context and exec functions * adding changes * update * adding async poc changes * adding changes with run in threadpool * added async wrapper for rest of the api's * adding update context and exec functions * update * adding lock to create_unique_exec * added comments removed unused libraries and imports * removing lock from create_unique_exec and adding to mlmd_push * update parse_json_to_mlmd, adding comment and defining variable type * adding changes * Adding changes for the case when pipeline_name doesn't exists in the dict_of_exe_ids, this situtation is rare but is already reproduced multiple times ; adding it as a precaution * Fixed a typo * resolving GUI bugs, when double clicked on artifact type it will not request multiple API calls and click issue between artifact types of different pipelines * adding uuid to dataset artifacts * handling exception in artifact push if artifacts are empty * Update query_artifact_lineage_d3tree.py comments --------- Co-authored-by: Abhinav Chobey Co-authored-by: Varkha Sharma <112053040+varkha-d-sharma@users.noreply.github.com> Co-authored-by: First Second --- cmflib/cmf.py | 45 +++- cmflib/cmf_merger.py | 218 ++++++++++-------- cmflib/cmfquery.py | 2 - cmflib/commands/artifact/push.py | 9 +- cmflib/commands/metadata/push.py | 1 - cmflib/metadata_helper.py | 15 +- server/app/get_data.py | 48 ++-- server/app/main.py | 80 ++++--- server/app/query_artifact_lineage_d3force.py | 10 +- server/app/query_artifact_lineage_d3tree.py | 6 +- server/app/query_execution_lineage_d3force.py | 2 +- server/app/query_execution_lineage_d3tree.py | 3 +- server/app/query_list_of_executions.py | 9 - .../pages/artifacts/ArtifactTypeSidebar.jsx | 10 +- ui/src/pages/artifacts/index.jsx | 45 ++-- 15 files changed, 305 insertions(+), 198 deletions(-) diff --git a/cmflib/cmf.py b/cmflib/cmf.py index a1a2d4f0..58cc2ec3 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -24,7 +24,6 @@ import typing as t # This import is needed for jupyterlab environment -import dvc from ml_metadata.proto import metadata_store_pb2 as mlpb from ml_metadata.metadata_store import metadata_store from cmflib.dvc_wrapper import ( @@ -44,6 +43,8 @@ from cmflib.metadata_helper import ( get_or_create_parent_context, get_or_create_run_context, + get_or_create_context_with_type, + update_context_custom_properties, associate_child_to_parent_context, create_new_execution_in_existing_run_context, link_execution_to_artifact, @@ -313,6 +314,42 @@ def merge_created_context( ) return ctx + def update_context( + self, + type_name: str, + context_name: str, + context_id: int, + properties: t.Optional[t.Dict] = None, + custom_properties: t.Optional[t.Dict] = None + ) -> mlpb.Context: + self.context = get_or_create_context_with_type( + self.store, + context_name, + type_name, + properties, + type_properties = None, + custom_properties = custom_properties + ) + if self.context is None: + print("Error - no context id") + return + + if custom_properties: + for key, value in custom_properties.items(): + if isinstance(value, int): + self.context.custom_properties[key].int_value = value + else: + self.context.custom_properties[key].string_value = str( + value) + updated_context = update_context_custom_properties( + self.store, + context_id, + context_name, + self.context.properties, + self.context.custom_properties, + ) + return updated_context + def create_execution( self, execution_type: str, @@ -1678,14 +1715,14 @@ def update_existing_artifact( custom_properties: Dictionary containing custom properties to update. Returns: None - """ - + """ for key, value in custom_properties.items(): if isinstance(value, int): artifact.custom_properties[key].int_value = value else: artifact.custom_properties[key].string_value = str(value) put_artifact(self.store, artifact) + def get_artifact(self, artifact_id: int) -> mlpb.Artifact: """Gets the artifact object from mlmd""" @@ -1779,7 +1816,7 @@ def add_data( should already be versioned. Example: ```python - dataslice.add_data(f"data/raw_data/{j}.xml) + #dataslice.add_data(f"data/raw_data/{j}.xml) ``` Args: path: Name to identify the file to be added to the dataslice. diff --git a/cmflib/cmf_merger.py b/cmflib/cmf_merger.py index d2be110b..82c6082d 100644 --- a/cmflib/cmf_merger.py +++ b/cmflib/cmf_merger.py @@ -1,4 +1,4 @@ -### +# # Copyright (2022) Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,103 +17,141 @@ import json import os from cmflib import cmf +import traceback +from ml_metadata.errors import AlreadyExistsError +from ml_metadata.metadata_store import metadata_store +from ml_metadata.proto import metadata_store_pb2 as mlpb +from typing import Union - -# mlmd is created from metadata passed in Json format -def parse_json_to_mlmd(mlmd_json, path_to_store, cmd, exec_id): - mlmd_data = json.loads(mlmd_json) - # type(mlmd_data) - pipelines = mlmd_data["Pipeline"] - # print(type(pipelines)) - pipeline = pipelines[0] - # print(type(pipeline)) - pipeline_name = pipeline["name"] - # print(type(pipeline_name)) - stage = {} - if cmd == "push": - data = create_original_time_since_epoch(mlmd_data) - else: - data = mlmd_data - graph = False - if os.getenv('NEO4J_URI', "") != "": - graph = True - cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name, - graph=graph, is_server=True) - for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages - if exec_id is None: - list_executions = [execution for execution in stage["executions"]] - elif exec_id is not None: - list_executions = [ - execution - for execution in stage["executions"] - if execution["id"] == int(exec_id) - ] +def parse_json_to_mlmd(mlmd_json, path_to_store: str, cmd: str, exec_id: Union[str, int]) -> Union[str, None]: + try: + mlmd_data = json.loads(mlmd_json) + pipelines = mlmd_data["Pipeline"] + pipeline = pipelines[0] + pipeline_name = pipeline["name"] + stage = {} + + # When the command is "push", add the original_time_since_epoch to the custom_properties in the metadata while pulling mlmd no need + if cmd == "push": + data = create_original_time_since_epoch(mlmd_data) else: - return "Invalid execution id given." + data = mlmd_data - for execution in list_executions: # Iterates over all the executions - _ = cmf_class.merge_created_context( - pipeline_stage = stage['name'], - custom_properties = stage["custom_properties"], - ) - _ = cmf_class.merge_created_execution( - execution["properties"]["Context_Type"], - execution["properties"]["Execution"], - execution["properties"], - execution["custom_properties"], - execution["name"] - ) - for event in execution["events"]: # Iterates over all the events - artifact_type = event["artifact"]["type"] - event_type = event["type"] - artifact_name = (event["artifact"]["name"].split(":"))[0] - custom_props = event["artifact"]["custom_properties"] - props = event["artifact"]["properties"] - # print(props,'props') - uri = event["artifact"]["uri"] - if artifact_type == "Dataset" and event_type == 3: - cmf_class.log_dataset_with_version( - artifact_name, - uri, - "input", - props, - custom_properties=custom_props, + graph = False + # if cmf is configured with 'neo4j' make graph True. + if os.getenv('NEO4J_URI', "") != "": + graph = True + + # Initialize the connection configuration and metadata store + config = mlpb.ConnectionConfig() + config.sqlite.filename_uri = path_to_store + store = metadata_store.MetadataStore(config) + + # Initialize the cmf class with pipeline_name and graph_status + cmf_class = cmf.Cmf(filepath=path_to_store, pipeline_name=pipeline_name, #intializing cmf + graph=graph, is_server=True) + + for stage in data["Pipeline"][0]["stages"]: # Iterates over all the stages + if exec_id is None: #if exec_id is None we pass all the executions. + list_executions = [execution for execution in stage["executions"]] + elif exec_id is not None: # elif exec_id is not None, we pass executions for that specific id. + list_executions = [ + execution + for execution in stage["executions"] + if execution["id"] == int(exec_id) + ] + else: + return "Invalid execution id given." + + for execution in list_executions: # Iterates over all the executions + try: + _ = cmf_class.merge_created_context( + pipeline_stage=stage['name'], + custom_properties=stage["custom_properties"], ) - elif artifact_type == "Dataset" and event_type == 4: - cmf_class.log_dataset_with_version( - artifact_name, - uri, - "output", - props, - custom_properties=custom_props, + except AlreadyExistsError as e: + # Handle the case where the context already exists, possibly due to concurrent pushes. + # As both pipelines will be unable to fetch data from server + # updating custom properties if context already exists + _ = cmf_class.update_context( + str(stage["type"]), + str(stage["name"]), + stage["id"], + stage["properties"], + custom_properties=stage["custom_properties"] ) - elif artifact_type == "Model" and event_type == 3: - props["uri"] = uri - cmf_class.log_model_with_version( - path=artifact_name, - event="input", - props=props, - custom_properties=custom_props, + except Exception as e: + print(f"Error in merge_created_context") + try: + _ = cmf_class.merge_created_execution( + execution["properties"]["Context_Type"], + execution["properties"]["Execution"], + execution["properties"], + execution["custom_properties"], + execution["name"] ) - elif artifact_type == "Model" and event_type == 4: - props["uri"] = uri - cmf_class.log_model_with_version( - path=artifact_name, - event="output", - props=props, - custom_properties=custom_props, + except AlreadyExistsError as e: + _ = cmf_class.update_execution( + execution["id"], + execution["custom_properties"] ) - elif artifact_type == "Metrics": - # print(props,'parse') - cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) - elif artifact_type == "Dataslice": - dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) - dataslice.commit_existing(uri, props, custom_props) - elif artifact_type == "Step_Metrics": - cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, props, custom_props) - else: - pass + except Exception as e: + print(f"Error in merge_created_execution {e}") + + for event in execution["events"]: # Iterates over all the events + artifact_type = event["artifact"]["type"] + event_type = event["type"] + artifact_name = (event["artifact"]["name"].split(":"))[0] + custom_props = event["artifact"]["custom_properties"] + props = event["artifact"]["properties"] + uri = event["artifact"]["uri"] + try: + if artifact_type == "Dataset" : + if event_type == 3 : + event_io = "input" + else: + event_io = "output" + cmf_class.log_dataset_with_version( + artifact_name, + uri, + event_io, + props, + custom_properties=custom_props, + ) + elif artifact_type == "Model": + if event_type == 3 : + event_io = "input" + else: + event_io = "output" + props["uri"] = uri + cmf_class.log_model_with_version( + path=artifact_name, + event= event_io, + props=props, + custom_properties=custom_props, + ) + elif artifact_type == "Metrics": + cmf_class.log_execution_metrics_from_client(event["artifact"]["name"], custom_props) + elif artifact_type == "Dataslice": + dataslice = cmf_class.create_dataslice(event["artifact"]["name"]) + dataslice.commit_existing(uri, custom_props) + elif artifact_type == "Step_Metrics": + cmf_class.commit_existing_metrics(event["artifact"]["name"], uri, custom_props) + else: + pass + except AlreadyExistsError as e: + # if same pipeline is pushed twice at same time, update custom_properties using 2nd pipeline + artifact = store.get_artifacts_by_uri(uri) + cmf_class.update_existing_artifact( + artifact[0], + custom_properties=custom_props, + ) + except Exception as e: + print(f"Error in log_{artifact_type}_with_version" , e) + except Exception as e: + print(f"An error occurred in parse_json_to_mlmd: {e}") + traceback.print_exc() # create_time_since_epoch is appended to mlmd pushed to cmf-server as original_create_time_since_epoch def create_original_time_since_epoch(mlmd_data): diff --git a/cmflib/cmfquery.py b/cmflib/cmfquery.py index 426538f8..8251569d 100644 --- a/cmflib/cmfquery.py +++ b/cmflib/cmfquery.py @@ -19,11 +19,9 @@ import typing as t from enum import Enum from google.protobuf.json_format import MessageToDict - import pandas as pd from ml_metadata.metadata_store import metadata_store from ml_metadata.proto import metadata_store_pb2 as mlpb - from cmflib.mlmd_objects import CONTEXT_LIST __all__ = ["CmfQuery"] diff --git a/cmflib/commands/artifact/push.py b/cmflib/commands/artifact/push.py index b9f115b4..6957c8c6 100644 --- a/cmflib/commands/artifact/push.py +++ b/cmflib/commands/artifact/push.py @@ -93,10 +93,11 @@ def run(self): identifier ) # getting all artifacts with id # dropping artifact with type 'metrics' as metrics doesn't have physical file - artifacts = artifacts[artifacts['type'] != 'Metrics'] - # adding .dvc at the end of every file as it is needed for pull - artifacts['name'] = artifacts['name'].apply(lambda name: f"{name.split(':')[0]}.dvc") - names.extend(artifacts['name'].tolist()) + if not artifacts.empty: + artifacts = artifacts[artifacts['type'] != 'Metrics'] + # adding .dvc at the end of every file as it is needed for pull + artifacts['name'] = artifacts['name'].apply(lambda name: f"{name.split(':')[0]}.dvc") + names.extend(artifacts['name'].tolist()) file_set = set(names) result = dvc_push(list(file_set)) return result diff --git a/cmflib/commands/metadata/push.py b/cmflib/commands/metadata/push.py index 05141e74..bd630397 100644 --- a/cmflib/commands/metadata/push.py +++ b/cmflib/commands/metadata/push.py @@ -24,7 +24,6 @@ from cmflib.server_interface import server_interface from cmflib.utils.cmf_config import CmfConfig - # This class pushes mlmd file to cmf-server class CmdMetadataPush(CmdBase): def run(self): diff --git a/cmflib/metadata_helper.py b/cmflib/metadata_helper.py index b7a2065f..91f3b6d1 100644 --- a/cmflib/metadata_helper.py +++ b/cmflib/metadata_helper.py @@ -23,8 +23,6 @@ from ipaddress import ip_address, IPv4Address from typing import List import functools -import uuid - def value_to_mlmd_value(value) -> metadata_store_pb2.Value: if value is None: @@ -68,7 +66,7 @@ def connect_to_mlmd() -> metadata_store.MetadataStore: raise RuntimeError('Could not connect to the Metadata store.') -def get_artifacts_by_id(store, artifact_id: [int]) -> List[metadata_store_pb2.Artifact]: +def get_artifacts_by_id(store, artifact_id: List[int]) -> List[metadata_store_pb2.Artifact]: try: artifacts = store.get_artifacts_by_id(artifact_id) return artifacts @@ -123,6 +121,15 @@ def get_or_create_context_type(store, type_name, properties: dict = None) -> met context_type.id = store.put_context_type(context_type) # Returns ID return context_type +def update_context_custom_properties(store, context_id, context_name: str, properties: dict, custom_properties: dict) -> metadata_store_pb2.Context: + context = metadata_store_pb2.Context( + id = context_id, + name=context_name, + properties=properties, + custom_properties=custom_properties, + ) + store.put_contexts([context]) + return context def create_artifact_with_type( store, @@ -387,7 +394,7 @@ def create_new_execution_in_existing_run_context( git_start_commit: str = None, git_end_commit: str = "", python_env: str = "", - custom_properties: {} = None, + custom_properties: dict = None, create_new_execution:bool = True ) -> metadata_store_pb2.Execution: mlmd_custom_properties = {} diff --git a/server/app/get_data.py b/server/app/get_data.py index 4ee80431..0a0b6188 100644 --- a/server/app/get_data.py +++ b/server/app/get_data.py @@ -2,10 +2,10 @@ import pandas as pd import json import os +import typing as t +from fastapi.concurrency import run_in_threadpool from server.app.query_artifact_lineage_d3force import query_artifact_lineage_d3force from server.app.query_list_of_executions import query_list_of_executions -from fastapi.responses import FileResponse - async def get_model_data(mlmdfilepath, modelId): ''' @@ -78,7 +78,11 @@ async def get_model_data(mlmdfilepath, modelId): return model_data_df, model_exe_df, model_input_df, model_output_df -async def get_executions(mlmdfilepath, pipeline_name, exe_ids): +#Converts sync functions to async +async def async_api(function_to_async, mlmdfilepath: str, *argv): + return await run_in_threadpool(function_to_async, mlmdfilepath, *argv) + +def get_executions(mlmdfilepath: str, pipeline_name, exe_ids) -> pd.DataFrame: ''' Args: mlmdfilepath: mlmd file path. @@ -88,18 +92,18 @@ async def get_executions(mlmdfilepath, pipeline_name, exe_ids): Returns: returns dataframe of executions using execution_ids. ''' + query = cmfquery.CmfQuery(mlmdfilepath) df = pd.DataFrame() executions = query.get_all_executions_by_ids_list(exe_ids) df = pd.concat([df, executions], sort=True, ignore_index=True) - #df=df.drop('name',axis=1) return df -async def get_all_exe_ids(mlmdfilepath, pipeline_name: str = None): +def get_all_exe_ids(mlmdfilepath: str, pipeline_name: str = None) -> t.Dict[str, pd.DataFrame]: ''' Returns: - returns a dictionary which has pipeline_name as key and dataframe which includes {id,Execution_uuid,Context_Type,Context_id} as value. + returns a dictionary which has pipeline_name as key and dataframe which includes {id,Execution_uuid,Context_Type,Context_id} as value. ''' query = cmfquery.CmfQuery(mlmdfilepath) execution_ids = {} @@ -125,7 +129,7 @@ async def get_all_exe_ids(mlmdfilepath, pipeline_name: str = None): return execution_ids -async def get_all_artifact_ids(mlmdfilepath, execution_ids, pipeline_name: str = None): +def get_all_artifact_ids(mlmdfilepath: str, execution_ids, pipeline_name: str = None) -> t.Dict[str, t.Dict[str, pd.DataFrame]]: # following is a dictionary of dictionaries # First level dictionary key is pipeline_name # First level dicitonary value is nested dictionary @@ -170,8 +174,7 @@ async def get_all_artifact_ids(mlmdfilepath, execution_ids, pipeline_name: str = artifact_ids[name] = pd.DataFrame() return artifact_ids - -async def get_artifacts(mlmdfilepath, pipeline_name, art_type, artifact_ids): +def get_artifacts(mlmdfilepath, pipeline_name, art_type, artifact_ids): query = cmfquery.CmfQuery(mlmdfilepath) df = pd.DataFrame() if (query.get_pipeline_id(pipeline_name) != -1): @@ -202,16 +205,29 @@ async def get_artifacts(mlmdfilepath, pipeline_name, art_type, artifact_ids): tempout = json.loads(result) return tempout -def get_artifact_types(mlmdfilepath): +def get_artifact_types(mlmdfilepath) -> t.List[str]: query = cmfquery.CmfQuery(mlmdfilepath) artifact_types = query.get_all_artifact_types() return artifact_types -async def create_unique_executions(server_store_path, req_info): +def create_unique_executions(server_store_path, req_info) -> str: + """ + Creates list of unique executions by checking if they already exist on server or not. + locking is introduced lock to avoid data corruption on server, + when multiple similar pipelines pushed on server at same time. + Args: + server_store_path = mlmd file path on server + Returns: + Status of parse_json_to_mlmd + "exists": if execution already exists on cmf-server + "success": execution pushed successfully on cmf-server + """ mlmd_data = json.loads(req_info["json_payload"]) pipelines = mlmd_data["Pipeline"] pipeline = pipelines[0] pipeline_name = pipeline["name"] + if not pipeline_name: + return {"error": "Pipeline name is required"} executions_server = [] list_executions_exists = [] if os.path.exists(server_store_path): @@ -225,7 +241,7 @@ async def create_unique_executions(server_store_path, req_info): for j in i["executions"]: if j['name'] != "": #If executions have name , they are reusable executions continue #which needs to be merged in irrespective of whether already - #present or not so that new artifacts associated with it gets in. + #present or not so that new artifacts associated with it gets in. if 'Execution_uuid' in j['properties']: for uuid in j['properties']['Execution_uuid'].split(","): executions_client.append(uuid) @@ -253,12 +269,12 @@ async def create_unique_executions(server_store_path, req_info): json.dumps(mlmd_data), "/cmf-server/data/mlmd", "push", req_info["id"] ) status='success' + return status -async def get_mlmd_from_server(server_store_path: str, pipeline_name: str, exec_id: str): +def get_mlmd_from_server(server_store_path: str, pipeline_name: str, exec_id: str): query = cmfquery.CmfQuery(server_store_path) - execution_flag = 0 json_payload = None df = pd.DataFrame() if(query.get_pipeline_id(pipeline_name)!=-1): # checks if pipeline name is available in mlmd @@ -271,9 +287,7 @@ async def get_mlmd_from_server(server_store_path: str, pipeline_name: str, exec_ json_payload = query.dumptojson(pipeline_name, exec_id) return json_payload - -async def get_lineage_data(server_store_path,pipeline_name,type,dict_of_art_ids,dict_of_exe_ids): - query = cmfquery.CmfQuery(server_store_path) +def get_lineage_data(server_store_path,pipeline_name,type,dict_of_art_ids,dict_of_exe_ids): if type=="Artifacts": lineage_data = query_artifact_lineage_d3force(server_store_path, pipeline_name, dict_of_art_ids) ''' diff --git a/server/app/main.py b/server/app/main.py index 4ab11071..ad2a5148 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -1,13 +1,15 @@ # cmf-server api's -from fastapi import FastAPI, Request, status, HTTPException, Query, UploadFile, File +from fastapi import FastAPI, Request, HTTPException, Query, UploadFile, File from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import HTMLResponse, StreamingResponse +from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from contextlib import asynccontextmanager import pandas as pd from typing import List, Dict, Any - -from cmflib import cmfquery, cmf_merger +from cmflib import cmfquery +import asyncio +import threading +from collections import defaultdict from server.app.get_data import ( get_artifacts, get_lineage_data, @@ -16,8 +18,10 @@ get_artifact_types, get_all_artifact_ids, get_all_exe_ids, + async_api, get_executions, get_model_data + ) from server.app.query_artifact_lineage_d3force import query_artifact_lineage_d3force from server.app.query_execution_lineage_d3force import query_execution_lineage_d3force @@ -31,7 +35,8 @@ dict_of_art_ids = {} dict_of_exe_ids = {} - +pipeline_locks = {} +lock_counts = defaultdict(int) #lifespan used to prevent multiple loading and save time for visualization. @asynccontextmanager async def lifespan(app: FastAPI): @@ -39,9 +44,9 @@ async def lifespan(app: FastAPI): global dict_of_exe_ids if os.path.exists(server_store_path): # loaded execution ids with names into memory - dict_of_exe_ids = await get_all_exe_ids(server_store_path) + dict_of_exe_ids = await async_api(get_all_exe_ids, server_store_path) # loaded artifact ids into memory - dict_of_art_ids = await get_all_artifact_ids(server_store_path, dict_of_exe_ids) + dict_of_art_ids = await async_api(get_all_artifact_ids, server_store_path, dict_of_exe_ids) yield dict_of_art_ids.clear() dict_of_exe_ids.clear() @@ -75,7 +80,6 @@ async def lifespan(app: FastAPI): allow_headers=["*"], ) - @app.get("/") async def read_root(request: Request): return {"cmf-server"} @@ -87,16 +91,29 @@ async def mlmd_push(info: Request): print("......................") req_info = await info.json() pipeline_name = req_info["pipeline_name"] - status = await create_unique_executions(server_store_path, req_info) - if status == "version_update": - # Raise an HTTPException with status code 422 - raise HTTPException(status_code=422, detail="version_update") - # async function - await update_global_exe_dict(pipeline_name) - await update_global_art_dict(pipeline_name) + if not pipeline_name: + return {"error": "Pipeline name is required"} + if pipeline_name not in pipeline_locks: # create lock object for pipeline if it doesn't exists in lock + pipeline_locks[pipeline_name] = asyncio.Lock() + pipeline_lock = pipeline_locks[pipeline_name] + lock_counts[pipeline_name] += 1 # increment lock count by 1 if pipeline going to enter inside lock section + async with pipeline_lock: + try: + status = await async_api(create_unique_executions, server_store_path, req_info) + if status == "version_update": + # Raise an HTTPException with status code 422 + raise HTTPException(status_code=422, detail="version_update") + if status != "exists": + # async function + await update_global_exe_dict(pipeline_name) + await update_global_art_dict(pipeline_name) + finally: + lock_counts[pipeline_name] -= 1 # Decrement the reference count after lock released + if lock_counts[pipeline_name] == 0: #if lock_counts of pipeline is zero means lock is release from it + del pipeline_locks[pipeline_name] # Remove the lock if it's no longer needed + del lock_counts[pipeline_name] return {"status": status, "data": req_info} - # api to get mlmd file from cmf-server @app.get("/mlmd_pull/{pipeline_name}", response_class=HTMLResponse) async def mlmd_pull(info: Request, pipeline_name: str): @@ -104,7 +121,7 @@ async def mlmd_pull(info: Request, pipeline_name: str): req_info = await info.json() if os.path.exists(server_store_path): #json_payload values can be json data, NULL or no_exec_id. - json_payload= await get_mlmd_from_server(server_store_path, pipeline_name, req_info['exec_id']) + json_payload= await async_api(get_mlmd_from_server, server_store_path, pipeline_name, req_info['exec_id']) else: print("No mlmd file submitted.") json_payload = "" @@ -123,7 +140,7 @@ async def executions( filter_value: str = Query(None, description="Filter value"), ): # checks if mlmd file exists on server - if os.path.exists(server_store_path): + if os.path.exists(server_store_path) and pipeline_name in dict_of_exe_ids: exe_ids_initial = dict_of_exe_ids[pipeline_name] # Apply filtering if provided if filter_by and filter_value: @@ -137,7 +154,7 @@ async def executions( if total_items < end_idx: end_idx = total_items exe_ids_list = exe_ids[start_idx:end_idx] - executions_df = await get_executions(server_store_path, pipeline_name, exe_ids_list) + executions_df = await async_api(get_executions, server_store_path, pipeline_name, exe_ids_list) temp = executions_df.to_json(orient="records") executions_parsed = json.loads(temp) return { @@ -158,12 +175,10 @@ async def artifact_lineage(request: Request, pipeline_name: str): ''' # checks if mlmd file exists on server - if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response=await get_lineage_data(server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) - #response = None + response=await async_api(get_lineage_data, server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) return response else: return f"Pipeline name {pipeline_name} doesn't exist." @@ -181,7 +196,7 @@ async def list_of_executions(request: Request, pipeline_name: str): if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response = await get_lineage_data(server_store_path,pipeline_name,"Execution",dict_of_art_ids,dict_of_exe_ids) + response = await async_api(get_lineage_data, server_store_path,pipeline_name,"Execution",dict_of_art_ids,dict_of_exe_ids) return response else: return f"Pipeline name {pipeline_name} doesn't exist." @@ -202,7 +217,7 @@ async def execution_lineage(request: Request, pipeline_name: str, uuid: str): if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response = await query_execution_lineage_d3force(server_store_path, pipeline_name, dict_of_exe_ids, uuid) + response = await async_api(query_execution_lineage_d3force, server_store_path, pipeline_name, dict_of_exe_ids, uuid) else: response = None return response @@ -220,7 +235,7 @@ async def execution_lineage(request: Request,uuid, pipeline_name: str): if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response = await query_execution_lineage_d3tree(server_store_path, pipeline_name, dict_of_exe_ids,uuid) + response = await async_api(query_execution_lineage_d3tree, server_store_path, pipeline_name, dict_of_exe_ids,uuid) return response # api to display artifacts available in mlmd @@ -236,7 +251,6 @@ async def artifacts( filter_by: str = Query(None, description="Filter by column"), filter_value: str = Query(None, description="Filter value"), ): - empty_df = pd.DataFrame() art_ids_dict = {} art_type = type # checks if mlmd file exists on server @@ -267,7 +281,7 @@ async def artifacts( if total_items < end_idx: end_idx = total_items artifact_id_list = list(art_ids)[start_idx:end_idx] - artifact_df = await get_artifacts(server_store_path, pipeline_name, art_type, artifact_id_list) + artifact_df = await async_api(get_artifacts, server_store_path, pipeline_name, art_type, artifact_id_list) data_paginated = artifact_df #data_paginated is returned None if artifact df is None or {} #it will load empty page, without this condition it will load @@ -301,8 +315,7 @@ async def artifact_lineage(request: Request, pipeline_name: str) -> List[List[Di if os.path.exists(server_store_path): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): - response = await query_artifact_lineage_d3tree(server_store_path, pipeline_name, dict_of_art_ids) - #response = "null" + response = await async_api(query_artifact_lineage_d3tree, server_store_path, pipeline_name, dict_of_art_ids) return response #This api's returns list of artifact types. @@ -310,7 +323,7 @@ async def artifact_lineage(request: Request, pipeline_name: str) -> List[List[Di async def artifact_types(request: Request): # checks if mlmd file exists on server if os.path.exists(server_store_path): - artifact_types = get_artifact_types(server_store_path) + artifact_types = await async_api(get_artifact_types, server_store_path) return artifact_types else: artifact_types = "" @@ -352,7 +365,6 @@ async def model_card(request:Request, modelId: int, response_model=List[Dict[str model_exe_df = pd.DataFrame() model_input_art_df = pd.DataFrame() model_output_art_df = pd.DataFrame() - df = pd.DataFrame() # checks if mlmd file exists on server if os.path.exists(server_store_path): model_data_df, model_exe_df, model_input_art_df, model_output_art_df = await get_model_data(server_store_path, modelId) @@ -372,15 +384,15 @@ async def model_card(request:Request, modelId: int, response_model=List[Dict[str async def update_global_art_dict(pipeline_name): global dict_of_art_ids - output_dict = await get_all_artifact_ids(server_store_path, dict_of_exe_ids, pipeline_name) - # type(dict_of_exe_ids[pipeline_name]) = Dict[ ] + output_dict = await async_api(get_all_artifact_ids, server_store_path, dict_of_exe_ids, pipeline_name) + # type(dict_of_art_ids[pipeline_name]) = Dict[ ] dict_of_art_ids[pipeline_name]=output_dict[pipeline_name] return async def update_global_exe_dict(pipeline_name): global dict_of_exe_ids - output_dict = await get_all_exe_ids(server_store_path, pipeline_name) + output_dict = await async_api(get_all_exe_ids, server_store_path, pipeline_name) # type(dict_of_exe_ids[pipeline_name]) = dict_of_exe_ids[pipeline_name] = output_dict[pipeline_name] return diff --git a/server/app/query_artifact_lineage_d3force.py b/server/app/query_artifact_lineage_d3force.py index 9cd60e59..1532709b 100644 --- a/server/app/query_artifact_lineage_d3force.py +++ b/server/app/query_artifact_lineage_d3force.py @@ -1,13 +1,7 @@ -import itertools -import re -import networkx as nx -from networkx.drawing.nx_agraph import graphviz_layout import pandas as pd from cmflib import cmfquery -import dvc -import json -import random import warnings +import typing as t warnings.filterwarnings("ignore") @@ -19,7 +13,7 @@ def truncate_artifact_name(my_str): temp=":".join(temp) return temp -def query_artifact_lineage_d3force(mlmd_path, pipeline_name, dict_of_art_ids): +def query_artifact_lineage_d3force(mlmd_path: str, pipeline_name: str, dict_of_art_ids: t.Dict[str, t.Dict[str, pd.DataFrame]]) -> dict: art_name_id = {} artifact_name_list = [] query = cmfquery.CmfQuery(mlmd_path) diff --git a/server/app/query_artifact_lineage_d3tree.py b/server/app/query_artifact_lineage_d3tree.py index 8745188a..4f631788 100644 --- a/server/app/query_artifact_lineage_d3tree.py +++ b/server/app/query_artifact_lineage_d3tree.py @@ -4,7 +4,7 @@ #from get_data import get_all_artifact_ids, get_all_exe_ids from typing import List, Dict, Any -async def query_artifact_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_art_ids: Dict) -> List[List[Dict[str, Any]]]: +def query_artifact_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_art_ids: dict) -> List[List[Dict[str, Any]]]: query = cmfquery.CmfQuery(mlmd_path) id_name = {} child_parent_artifact_id = {} @@ -62,8 +62,8 @@ def modify_arti_name(arti_name, type): #first split on ':' then on '/' to get name. Example 'Test-env/prepare:uuid:32' -> prepare_uuid name = arti_name.split(':')[-3].split("/")[-1] + ":" + arti_name.split(':')[-2][:4] elif type == "Dataset": - # Example artifacts/data.xml.gz:236d9502e0283d91f689d7038b8508a2 -> data.xml.gz - name = arti_name.split(':')[-2] .split("/")[-1] + # Example artifacts/data.xml.gz:236d9502e0283d91f689d7038b8508a2 -> data.xml.gz:236d + name = arti_name.rsplit(':')[0] .split("/")[-1] + ":" + arti_name.split(':')[-1][:4] elif type == "Dataslice": # cmf_artifacts/dataslices/ecd6dcde-4f3b-11ef-b8cd-f71a4cc9ba38/slice-1:e77e3466872898fcf2fa22a3752bc1ca dataslice_part1 = arti_name.split("/",1)[1] #remove cmf_artifacts/ diff --git a/server/app/query_execution_lineage_d3force.py b/server/app/query_execution_lineage_d3force.py index a054ccc4..cca2bdb0 100644 --- a/server/app/query_execution_lineage_d3force.py +++ b/server/app/query_execution_lineage_d3force.py @@ -2,7 +2,7 @@ import pandas as pd from typing import Dict -async def query_execution_lineage_d3force(mlmd_path, pipeline_name, dict_of_exe_ids, uuid_server) -> Dict: +def query_execution_lineage_d3force(mlmd_path, pipeline_name, dict_of_exe_ids, uuid_server) -> Dict: """ Creates data of executions for forced_directed_graph. Parameters: diff --git a/server/app/query_execution_lineage_d3tree.py b/server/app/query_execution_lineage_d3tree.py index 39252faf..5f0ad924 100644 --- a/server/app/query_execution_lineage_d3tree.py +++ b/server/app/query_execution_lineage_d3tree.py @@ -1,4 +1,3 @@ -import time, json from cmflib import cmfquery from collections import deque, defaultdict import pandas as pd @@ -27,7 +26,7 @@ def __contains__(self, value): return value in self.seen -async def query_execution_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_exe_id, uuid): +def query_execution_lineage_d3tree(mlmd_path: str, pipeline_name: str, dict_of_exe_id, uuid): query = cmfquery.CmfQuery(mlmd_path) pipeline_id = query.get_pipeline_id(pipeline_name) df=dict_of_exe_id[pipeline_name] diff --git a/server/app/query_list_of_executions.py b/server/app/query_list_of_executions.py index 50c95c48..3cd0eb21 100644 --- a/server/app/query_list_of_executions.py +++ b/server/app/query_list_of_executions.py @@ -1,12 +1,3 @@ -import itertools -import re -import networkx as nx -from networkx.drawing.nx_agraph import graphviz_layout -import pandas as pd -from cmflib import cmfquery -import dvc -import json -import random import warnings warnings.filterwarnings("ignore") diff --git a/ui/src/pages/artifacts/ArtifactTypeSidebar.jsx b/ui/src/pages/artifacts/ArtifactTypeSidebar.jsx index 0d83cfe3..c931ad05 100644 --- a/ui/src/pages/artifacts/ArtifactTypeSidebar.jsx +++ b/ui/src/pages/artifacts/ArtifactTypeSidebar.jsx @@ -22,15 +22,17 @@ import "./index.css"; const ArtifactTypeSidebar = ({ artifactTypes, handleArtifactTypeClick }) => { const [clickedArtifactType, setClickedArtifactType] = useState(artifactTypes[0]); - useEffect(() => { handleClick(artifactTypes[0]); // eslint-disable-next-line + }, []); + useEffect(() => { + setClickedArtifactType(artifactTypes[0]) + // eslint-disable-next-line }, [artifactTypes]); - const handleClick = (artifactType) => { - setClickedArtifactType(artifactType); - handleArtifactTypeClick(artifactType); + setClickedArtifactType(artifactType); + handleArtifactTypeClick(artifactType); }; return ( diff --git a/ui/src/pages/artifacts/index.jsx b/ui/src/pages/artifacts/index.jsx index 2ff6125e..3288b2b1 100644 --- a/ui/src/pages/artifacts/index.jsx +++ b/ui/src/pages/artifacts/index.jsx @@ -30,7 +30,9 @@ const client = new FastAPIClient(config); const Artifacts = () => { const [selectedPipeline, setSelectedPipeline] = useState(null); const [pipelines, setPipelines] = useState([]); - const [artifacts, setArtifacts] = useState([]); + // undefined state is to check whether artifacts data is set + // null state of artifacts we display No Data + const [artifacts, setArtifacts] = useState([undefined]); const [artifactTypes, setArtifactTypes] = useState([]); const [selectedArtifactType, setSelectedArtifactType] = useState(null); const [totalItems, setTotalItems] = useState(0); @@ -57,14 +59,18 @@ const Artifacts = () => { }, []); const handlePipelineClick = (pipeline) => { - setArtifacts(null); + if (selectedPipeline !== pipeline) { // this condition sets page as null. + setArtifacts(null); + } setSelectedPipeline(pipeline); setActivePage(1); - fetchArtifacts(pipeline, selectedArtifactType, activePage, sortField, sortOrder, filterBy, filterValue); + }; const handleArtifactTypeClick = (artifactType) => { - setArtifacts(null); + if (selectedArtifactType !== artifactType) { // if same artifact type is not clicked, sets page as null until it retrieves data for that type. + setArtifacts(null); + } setSelectedArtifactType(artifactType); setActivePage(1); }; @@ -72,7 +78,7 @@ const Artifacts = () => { const fetchArtifactTypes = () => { client.getArtifactTypes().then((types) => { setArtifactTypes(types); - handleArtifactTypeClick(types[0]); + fetchArtifacts(selectedPipeline, types[0], activePage, sortField, sortOrder, filterBy, filterValue); }); }; @@ -84,17 +90,20 @@ const Artifacts = () => { }, [selectedPipeline]); const fetchArtifacts = (pipelineName, type, page, sortField, sortOrder, filterBy, filterValue) => { + setArtifacts(undefined) + // if data then set artifacts with that data else set it null. client.getArtifacts(pipelineName, type, page, sortField, sortOrder, filterBy, filterValue).then((data) => { setArtifacts(data.items); setTotalItems(data.total_items); - }); + }) + .catch(() => setArtifacts(null)); }; useEffect(() => { if (selectedPipeline && selectedArtifactType) { fetchArtifacts(selectedPipeline, selectedArtifactType, activePage, sortField, sortOrder, filterBy, filterValue); } - }, [selectedPipeline, selectedArtifactType, activePage, sortField, sortOrder, filterBy, filterValue]); + }, [selectedArtifactType, activePage, sortField, sortOrder, filterBy, filterValue]); const handlePageClick = (page) => { setActivePage(page); @@ -150,12 +159,16 @@ const Artifacts = () => { )}
- {selectedPipeline !== null && selectedArtifactType !== null && artifacts !== null && artifacts !== {} && ( - - )} -
- {artifacts !== null && totalItems > 0 && ( - <> + {selectedPipeline !== null && selectedArtifactType !== null ? ( artifacts === undefined ? ( +
Loading....
+ ): artifacts === null || artifacts.length === 0 ? ( +
No Data
+ ):( + <> + + + {totalItems > 0 && ( +
- +
)} + + ) + ): null}
-