Skip to content

Commit

Permalink
Tangled tree visualization (#176)
Browse files Browse the repository at this point in the history
* adding tangled tree visualization to execution lineage

* adding changes for tangled tree visualization

* update comments and cleanup

* resolving some bugs realized due to big mlmd file

* fixed exec_name going out of svg, made splitting of exec_name uui more generic

* restricting tree inside svg container

* fixed a bug

* changing count with enumerate method

* updating some changes

---------

Co-authored-by: Abhinav Chobey <chobey@abhinav-cmf-hpe.labs.hpecorp.net>
Co-authored-by: Abhinav Chobey <chobey@varkha-test-cors-machine.labs.hpecorp.net>
Co-authored-by: Varkha Sharma <112053040+varkha-d-sharma@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 24, 2024
1 parent 49ae445 commit fcc78d0
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 14 deletions.
38 changes: 38 additions & 0 deletions cmflib/cmfquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,44 @@ def get_one_hop_parent_executions(self, execution_id: t.List[int], pipeline_id:
list_exec.append(self.store.get_executions_by_id(exec))
return list_exec

def get_one_hop_parent_executions_ids(self, execution_id: t.List[int], pipeline_id: str = None) -> t.List[int]:
"""Get parent execution ids for given execution id
Args:
execution_id : Execution id for which parent execution are required
It is passed in list, for example execution_id: [1]
pipeline_id : Pipeline id
Return:
Returns parent executions for given id
"""
artifacts: t.Optional = self._get_input_artifacts(execution_id)
if not artifacts:
return None

exe_ids = []

for id in artifacts:
ids = self._get_executions_by_output_artifact_id(id, pipeline_id)
exe_ids.extend(ids)
return exe_ids

def get_executions_with_execution_ids(self, exe_ids: t.List[int]):
"""For list of execution ids it returns df with "id,Execution_type_name, Execution_uuid"
Args:
execution ids: List of execution ids.
Return:
["id","Execution_type_name","Execution_uuid"]
"""
df = pd.DataFrame()
executions = self.store.get_executions_by_id(exe_ids)
for count, exe in enumerate(executions):
temp_dict = {}
temp_dict['id'] = exe_ids[count]
d1 = self._transform_to_dataframe(exe, temp_dict)
df = pd.concat([df, d1], sort=True, ignore_index=True)
df.drop_duplicates()
df = df[["id", "Execution_type_name","Execution_uuid"]]
return df

def get_one_hop_child_executions(self, execution_id: t.List[int]) -> t.List[int]:
"""Get artifacts produced by executions that consume given artifact.
Expand Down
19 changes: 19 additions & 0 deletions server/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from server.app.query_visualization import query_visualization
from server.app.query_exec_lineage import query_exec_lineage
from server.app.query_tangled_lineage import query_tangled_lineage
from pathlib import Path
import os
import json
Expand All @@ -40,6 +41,7 @@ async def lifespan(app: FastAPI):
dict_of_art_ids = await get_all_artifact_ids(server_store_path)
# loaded execution ids with names into memory
dict_of_exe_ids = await get_all_exe_ids(server_store_path)

yield
dict_of_art_ids.clear()
dict_of_exe_ids.clear()
Expand Down Expand Up @@ -161,6 +163,7 @@ async def display_artifact_lineage(request: Request, pipeline_name: str):
query = cmfquery.CmfQuery(server_store_path)
if (pipeline_name in query.get_pipeline_names()):
response=await get_lineage_data(server_store_path,pipeline_name,"Artifacts",dict_of_art_ids,dict_of_exe_ids)
#response = null
return response
else:
return f"Pipeline name {pipeline_name} doesn't exist."
Expand Down Expand Up @@ -203,6 +206,22 @@ async def display_exec_lineage(request: Request, exec_type: str, pipeline_name:
else:
response = None
return response

@app.get("/display_tree_lineage/{uuid}/{pipeline_name}")
async def display_tree_lineage(request: Request,uuid, pipeline_name: str):
'''
returns dictionary of nodes and links for given execution_type.
response = {
nodes: [{id:"",name:"",execution_uuid:""}],
links: [{source:1,target:4},{}],
}
'''
# checks if mlmd file exists on server
if os.path.exists(server_store_path):
query = cmfquery.CmfQuery(server_store_path)
if (pipeline_name in query.get_pipeline_names()):
response = await query_tangled_lineage(server_store_path, pipeline_name, dict_of_exe_ids,uuid)
return response

# api to display artifacts available in mlmd
@app.get("/display_artifacts/{pipeline_name}/{type}")
Expand Down
90 changes: 90 additions & 0 deletions server/app/query_tangled_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
from cmflib import cmfquery
from collections import deque, defaultdict
import pandas as pd

async def query_tangled_lineage(mlmd_path,pipeline_name, dict_of_exe_id,uuid):
query = cmfquery.CmfQuery(mlmd_path)
pipeline_id = query.get_pipeline_id(pipeline_name)
df=dict_of_exe_id[pipeline_name]

#finding execution_id by comparing Execution_uuid (d09fdb26-0e9d-11ef-944f-4bf54f5aca7f) and uuid ('Prepare_u3tr')
result = df[df['Execution_uuid'].str[:4] == uuid] #result = df[id: "1","Execution_type_name", "Execution_uuid"]
execution_id=result["id"].tolist()
parents_set = set()
queue = deque()
df = pd.DataFrame()

parents = query.get_one_hop_parent_executions_ids(execution_id,pipeline_id) #list if parent execution ids
dict_parents = {}
if parents == None:
parents = []
dict_parents[execution_id[0]] = list(set(parents)) # [2] = [1,2,3,4] list of parent id
parents_set.add(execution_id[0]) #created so that we can directly find execuions using execution ids
for i in set(parents):
queue.append(i)
parents_set.add(i)
while len(queue) > 0:
exe_id = queue.popleft()
parents = query.get_one_hop_parent_executions_ids([exe_id],pipeline_id)
if parents == None:
parents = []
dict_parents[exe_id] = list(set(parents))
for i in set(parents):
queue.append(i)
parents_set.add(i)

df = query.get_executions_with_execution_ids(list(parents_set)) # for execution_id get executions(complete df with all data of executions)

df['name_uuid'] = df['Execution_type_name'] + '_' + df['Execution_uuid']
result_dict = df.set_index('id')['name_uuid'].to_dict() # {"id" : "name_uuid"} for example {"2":"Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f"}

data_organized = topological_sort(dict_parents,result_dict) # it will use topological sort to create data from parents to child pattern
"""
data_organized format
[[{'id': 'Prepare_d09f', 'parents': []}],
[{'id': 'Featurize_fae6', 'parents': ['Prepare_d09f']}],
[{'id': 'Train_7fe7', 'parents': ['Featurize_fae6']}]]
"""
return data_organized

def topological_sort(input_data,execution_id_dict):
# Initialize in-degree of all nodes to 0
in_degree = {node: 0 for node in input_data}
# Initialize adjacency list
adj_list = defaultdict(list)

# Fill the adjacency list and in-degree dictionary
for node, dependencies in input_data.items():
for dep in dependencies:
adj_list[dep].append(node)
in_degree[node] += 1

# Queue for nodes with in-degree 0
zero_in_degree_queue = deque([node for node, degree in in_degree.items() if degree == 0])
topo_sorted_nodes = []

while zero_in_degree_queue:
current_node = zero_in_degree_queue.popleft()
topo_sorted_nodes.append(current_node)
for neighbor in adj_list[current_node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
zero_in_degree_queue.append(neighbor)
# Transform sorted nodes into the required output format
parent_dict = defaultdict(list)
# creating list of list which contains dictionary of {"id":1,parents:"execution_name"}
for id_val in topo_sorted_nodes: # topo_sorted_nodes = ['1','2','3','4']
if id_val in input_data: # input_data = {"child_id":[parents_id]}, for example {"4":['3','7','9']}
parents = tuple(sorted(input_data[id_val]))
# {tuple(parents): {'id':execution_name,'parents':["exec_1","exec_2","exec_3"]}
# append id,parents to key with same parents to get all child in same list
parent_dict[parents].append({'id': modify_exec_name(execution_id_dict[id_val]),'parents': [modify_exec_name(execution_id_dict[parent]) for parent in input_data[id_val]]})
output_data= list(parent_dict.values())
return output_data

def modify_exec_name(exec_name_uuid):
after_first_slash=exec_name_uuid.split('/', 1)[1]
name='_'.join(after_first_slash.rsplit('_', 1)[:-1])# 'Test-env/Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f' ---> Prepare
uuid=exec_name_uuid.split('_')[-1].split('-')[0][:4] # 'Test-env/Prepare_d09fdb26-0e9d-11ef-944f-4bf54f5aca7f' ---> d09f
return (name +"_"+uuid) # Prepare_d09f
2 changes: 1 addition & 1 deletion server/app/query_visualization_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def query_visualization_execution(mlmd_path, pipeline_name, dict_of_art_ids, dic
list_of_exec = dict_of_exe_ids[pipeline_name]["Context_Type"].tolist()
list_of_uuid = dict_of_exe_ids[pipeline_name]["Execution_uuid"].tolist()
for exec_type, uuid in zip(list_of_exec, list_of_uuid):
list_of_exec_uuid.append(exec_type + "_" + uuid.split("-")[0][:4])
list_of_exec_uuid.append(exec_type.split("/",1)[1] + "_" + uuid.split("-")[0][:4])
return list_of_exec_uuid

#print(query_visualization_execution("/home/chobey/cmf-server/data/mlmd","image"))
1 change: 1 addition & 0 deletions ui/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ENV PATH /app/node_modules/.bin:$PATH
COPY package.json ./
COPY package-lock.json ./
RUN npm install --silent
RUN npm install svg.js
RUN npm install d3
RUN npm install react-scripts@5.0.1 -g --silent
COPY . ./
Expand Down
8 changes: 8 additions & 0 deletions ui/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ class FastAPIClient {
});
}

async getExecTreeLineage(pipeline,uuid) {
return this.apiClient.get(`/display_tree_lineage/${uuid}/${pipeline}`)
.then(({ data }) => {
return data;
});
}


async getExecutions(pipelineName, page, sortField, sortOrder , filterBy, filterValue) {
return this.apiClient
.get(`/display_executions/${pipelineName}`, {
Expand Down
21 changes: 21 additions & 0 deletions ui/src/components/ExecutionTangledDropdown/index.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.dropdown {
position: relative;
display: inline-block;
}

.dropdown-select {
appearance: none;
padding: 8px 16px;
font-size: 14px;
border: 1px solid #ccc;
border-radius: 4px;
outline: none;
cursor: pointer;
transition: border-color 0.3s ease;
}

.dropdown-select:hover,
.dropdown-select:focus {
border-color: #4a90e2;
}

37 changes: 37 additions & 0 deletions ui/src/components/ExecutionTangledDropdown/index.jsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import React, { useState,useEffect } from "react";

const ExecutionTangledDropdown = ({data,exec_type,handleTreeClick}) => {
const [selectedExecutionType, setSelectedExecutionType] = useState('');

useEffect(() => {
if (exec_type) {
setSelectedExecutionType(exec_type);
}
}, [exec_type]);

const handleCallExecutionClick = (event) => {
handleTreeClick(event.target.value);
};

return (
<div className= "dropdown">
<select
className= "dropdown-select"
value= {selectedExecutionType}
onChange={(event) => { handleCallExecutionClick(event); }}
>
{data.map((type, index) => {
return (
<option key={index} value={type}>
{type}
</option>
);
})}
</select>
</div>

);
};

export default ExecutionTangledDropdown;

Loading

0 comments on commit fcc78d0

Please sign in to comment.