diff --git a/cmflib/cmfquery.py b/cmflib/cmfquery.py index 054569a2..e4c13cbe 100644 --- a/cmflib/cmfquery.py +++ b/cmflib/cmfquery.py @@ -617,6 +617,44 @@ def get_one_hop_parent_executions(self, execution_id: t.List[int], pipeline_id: list_exec.append(self.store.get_executions_by_id(exec)) return list_exec + def get_one_hop_parent_executions_ids(self, execution_id: t.List[int], pipeline_id: str = None) -> t.List[int]: + """Get parent execution ids for given execution id + Args: + execution_id : Execution id for which parent execution are required + It is passed in list, for example execution_id: [1] + pipeline_id : Pipeline id + Return: + Returns parent executions for given id + """ + artifacts: t.Optional = self._get_input_artifacts(execution_id) + if not artifacts: + return None + + exe_ids = [] + + for id in artifacts: + ids = self._get_executions_by_output_artifact_id(id, pipeline_id) + exe_ids.extend(ids) + return exe_ids + + def get_executions_with_execution_ids(self, exe_ids: t.List[int]): + """For list of execution ids it returns df with "id,Execution_type_name, Execution_uuid" + Args: + execution ids: List of execution ids. + Return: + ["id","Execution_type_name","Execution_uuid"] + """ + df = pd.DataFrame() + executions = self.store.get_executions_by_id(exe_ids) + for count, exe in enumerate(executions): + temp_dict = {} + temp_dict['id'] = exe_ids[count] + d1 = self._transform_to_dataframe(exe, temp_dict) + df = pd.concat([df, d1], sort=True, ignore_index=True) + df.drop_duplicates() + df = df[["id", "Execution_type_name","Execution_uuid"]] + return df + def get_one_hop_child_executions(self, execution_id: t.List[int]) -> t.List[int]: """Get artifacts produced by executions that consume given artifact. diff --git a/server/app/main.py b/server/app/main.py index 1dabe6c3..9af1e02c 100644 --- a/server/app/main.py +++ b/server/app/main.py @@ -21,6 +21,7 @@ ) from server.app.query_visualization import query_visualization from server.app.query_exec_lineage import query_exec_lineage +from server.app.query_tangled_lineage import query_tangled_lineage from pathlib import Path import os import json @@ -40,6 +41,7 @@ async def lifespan(app: FastAPI): dict_of_art_ids = await get_all_artifact_ids(server_store_path) # loaded execution ids with names into memory dict_of_exe_ids = await get_all_exe_ids(server_store_path) + yield dict_of_art_ids.clear() dict_of_exe_ids.clear() @@ -161,6 +163,7 @@ async def display_artifact_lineage(request: Request, pipeline_name: str): query = cmfquery.CmfQuery(server_store_path) if (pipeline_name in query.get_pipeline_names()): response=await get_lineage_data(server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids) + #response = null return response else: return f"Pipeline name {pipeline_name} doesn't exist." @@ -203,6 +206,22 @@ async def display_exec_lineage(request: Request, exec_type: str, pipeline_name: else: response = None return response + +@app.get("/display_tree_lineage/{uuid}/{pipeline_name}") +async def display_tree_lineage(request: Request,uuid, pipeline_name: str): + ''' + returns dictionary of nodes and links for given execution_type. + response = { + nodes: [{id:"",name:"",execution_uuid:""}], + links: [{source:1,target:4},{}], + } + ''' + # checks if mlmd file exists on server + if os.path.exists(server_store_path): + query = cmfquery.CmfQuery(server_store_path) + if (pipeline_name in query.get_pipeline_names()): + response = await query_tangled_lineage(server_store_path, pipeline_name, dict_of_exe_ids,uuid) + return response # api to display artifacts available in mlmd @app.get("/display_artifacts/{pipeline_name}/{type}") diff --git a/server/app/query_tangled_lineage.py b/server/app/query_tangled_lineage.py new file mode 100644 index 00000000..74a95f8e --- /dev/null +++ b/server/app/query_tangled_lineage.py @@ -0,0 +1,90 @@ +import os +from cmflib import cmfquery +from collections import deque, defaultdict +import pandas as pd + +async def query_tangled_lineage(mlmd_path,pipeline_name, dict_of_exe_id,uuid): + query = cmfquery.CmfQuery(mlmd_path) + pipeline_id = query.get_pipeline_id(pipeline_name) + df=dict_of_exe_id[pipeline_name] + + #finding execution_id by comparing Execution_uuid (d09fdb26-0e9d-11ef-944f-4bf54f5aca7f) and uuid ('Prepare_u3tr') + result = df[df['Execution_uuid'].str[:4] == uuid] #result = df[id: "1","Execution_type_name", "Execution_uuid"] + execution_id=result["id"].tolist() + parents_set = set() + queue = deque() + df = pd.DataFrame() + + parents = query.get_one_hop_parent_executions_ids(execution_id,pipeline_id) #list if parent execution ids + dict_parents = {} + if parents == None: + parents = [] + dict_parents[execution_id[0]] = list(set(parents)) # [2] = [1,2,3,4] list of parent id + parents_set.add(execution_id[0]) #created so that we can directly find execuions using execution ids + for i in set(parents): + queue.append(i) + parents_set.add(i) + while len(queue) > 0: + exe_id = queue.popleft() + parents = query.get_one_hop_parent_executions_ids([exe_id],pipeline_id) + if parents == None: + parents = [] + dict_parents[exe_id] = list(set(parents)) + for i in set(parents): + queue.append(i) + parents_set.add(i) + + df = query.get_executions_with_execution_ids(list(parents_set)) # for execution_id get executions(complete df with all data of executions) + + df['name_uuid'] = df['Execution_type_name'] + '_' + df['Execution_uuid'] + result_dict = df.set_index('id')['name_uuid'].to_dict() # {"id" : "name_uuid"} for example {"2":"Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f"} + + data_organized = topological_sort(dict_parents,result_dict) # it will use topological sort to create data from parents to child pattern + """ + data_organized format + [[{'id': 'Prepare_d09f', 'parents': []}], + [{'id': 'Featurize_fae6', 'parents': ['Prepare_d09f']}], + [{'id': 'Train_7fe7', 'parents': ['Featurize_fae6']}]] + """ + return data_organized + +def topological_sort(input_data,execution_id_dict): + # Initialize in-degree of all nodes to 0 + in_degree = {node: 0 for node in input_data} + # Initialize adjacency list + adj_list = defaultdict(list) + + # Fill the adjacency list and in-degree dictionary + for node, dependencies in input_data.items(): + for dep in dependencies: + adj_list[dep].append(node) + in_degree[node] += 1 + + # Queue for nodes with in-degree 0 + zero_in_degree_queue = deque([node for node, degree in in_degree.items() if degree == 0]) + topo_sorted_nodes = [] + + while zero_in_degree_queue: + current_node = zero_in_degree_queue.popleft() + topo_sorted_nodes.append(current_node) + for neighbor in adj_list[current_node]: + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + zero_in_degree_queue.append(neighbor) + # Transform sorted nodes into the required output format + parent_dict = defaultdict(list) + # creating list of list which contains dictionary of {"id":1,parents:"execution_name"} + for id_val in topo_sorted_nodes: # topo_sorted_nodes = ['1','2','3','4'] + if id_val in input_data: # input_data = {"child_id":[parents_id]}, for example {"4":['3','7','9']} + parents = tuple(sorted(input_data[id_val])) + # {tuple(parents): {'id':execution_name,'parents':["exec_1","exec_2","exec_3"]} + # append id,parents to key with same parents to get all child in same list + parent_dict[parents].append({'id': modify_exec_name(execution_id_dict[id_val]),'parents': [modify_exec_name(execution_id_dict[parent]) for parent in input_data[id_val]]}) + output_data= list(parent_dict.values()) + return output_data + +def modify_exec_name(exec_name_uuid): + after_first_slash=exec_name_uuid.split('/', 1)[1] + name='_'.join(after_first_slash.rsplit('_', 1)[:-1])# 'Test-env/Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f' ---> Prepare + uuid=exec_name_uuid.split('_')[-1].split('-')[0][:4] # 'Test-env/Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f' ---> d09f + return (name +"_"+uuid) # Prepare_d09f diff --git a/server/app/query_visualization_execution.py b/server/app/query_visualization_execution.py index 508f5c70..cbe96b62 100644 --- a/server/app/query_visualization_execution.py +++ b/server/app/query_visualization_execution.py @@ -16,7 +16,7 @@ def query_visualization_execution(mlmd_path, pipeline_name, dict_of_art_ids, dic list_of_exec = dict_of_exe_ids[pipeline_name]["Context_Type"].tolist() list_of_uuid = dict_of_exe_ids[pipeline_name]["Execution_uuid"].tolist() for exec_type, uuid in zip(list_of_exec, list_of_uuid): - list_of_exec_uuid.append(exec_type + "_" + uuid.split("-")[0][:4]) + list_of_exec_uuid.append(exec_type.split("/",1)[1] + "_" + uuid.split("-")[0][:4]) return list_of_exec_uuid #print(query_visualization_execution("/home/chobey/cmf-server/data/mlmd","image")) diff --git a/ui/Dockerfile b/ui/Dockerfile index 83bf67bc..f65a618e 100644 --- a/ui/Dockerfile +++ b/ui/Dockerfile @@ -4,6 +4,7 @@ ENV PATH /app/node_modules/.bin:$PATH COPY package.json ./ COPY package-lock.json ./ RUN npm install --silent +RUN npm install svg.js RUN npm install d3 RUN npm install react-scripts@5.0.1 -g --silent COPY . ./ diff --git a/ui/src/client.js b/ui/src/client.js index 1ce882ac..a439c7c6 100644 --- a/ui/src/client.js +++ b/ui/src/client.js @@ -84,6 +84,14 @@ class FastAPIClient { }); } + async getExecTreeLineage(pipeline,uuid) { + return this.apiClient.get(`/display_tree_lineage/${uuid}/${pipeline}`) + .then(({ data }) => { + return data; + }); + } + + async getExecutions(pipelineName, page, sortField, sortOrder , filterBy, filterValue) { return this.apiClient .get(`/display_executions/${pipelineName}`, { diff --git a/ui/src/components/ExecutionTangledDropdown/index.css b/ui/src/components/ExecutionTangledDropdown/index.css new file mode 100644 index 00000000..c9cfb174 --- /dev/null +++ b/ui/src/components/ExecutionTangledDropdown/index.css @@ -0,0 +1,21 @@ +.dropdown { + position: relative; + display: inline-block; +} + +.dropdown-select { + appearance: none; + padding: 8px 16px; + font-size: 14px; + border: 1px solid #ccc; + border-radius: 4px; + outline: none; + cursor: pointer; + transition: border-color 0.3s ease; +} + +.dropdown-select:hover, +.dropdown-select:focus { + border-color: #4a90e2; +} + diff --git a/ui/src/components/ExecutionTangledDropdown/index.jsx b/ui/src/components/ExecutionTangledDropdown/index.jsx new file mode 100644 index 00000000..5e2898b5 --- /dev/null +++ b/ui/src/components/ExecutionTangledDropdown/index.jsx @@ -0,0 +1,37 @@ +import React, { useState,useEffect } from "react"; + +const ExecutionTangledDropdown = ({data,exec_type,handleTreeClick}) => { + const [selectedExecutionType, setSelectedExecutionType] = useState(''); + + useEffect(() => { + if (exec_type) { + setSelectedExecutionType(exec_type); + } + }, [exec_type]); + + const handleCallExecutionClick = (event) => { + handleTreeClick(event.target.value); + }; + + return ( +