Skip to content

Commit

Permalink
updated create_execution and cmf_dvc_ingest (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhsharma22 authored Apr 29, 2024
1 parent 9a532df commit 3db6310
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 34 deletions.
56 changes: 23 additions & 33 deletions cmflib/bin/cmf_dvc_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,16 @@ def get_cmf_hierarchy(execution_lineage:str):
execution_exist : True if it exeist, False otherwise
metawrite: cmf object
"""
def ingest_metadata(execution_lineage:str, metadata:dict, execution_exist:bool, metawriter:cmf.Cmf, command:str = "") :
def ingest_metadata(execution_lineage:str, metadata:dict, metawriter:cmf.Cmf, command:str = "") :
pipeline_name, context_name, execution = get_cmf_hierarchy(execution_lineage)

if execution_exist:
_ = metawriter.update_execution(int(execution))
else:
_ = metawriter.create_execution(str(context_name) + '_' + str(uuid_), {}, command)

_ = metawriter.create_execution(
str(context_name) + '_' + str(execution),
{},
cmd = str(command),
create_new_execution=False
)

for k, v in metadata.items():
if k == "deps":
for dep in v:
Expand Down Expand Up @@ -108,7 +110,7 @@ def find_location(string, elements):
outs = []
k_dict = {}
i = 0

for kk in valuesYaml['stages'][k]:
if kk == 'cmd':
cmd_list = valuesYaml['stages'][k][kk].split()
Expand All @@ -123,18 +125,18 @@ def find_location(string, elements):
k_dict['outs'] = outs

pipeline_dict[k][str(i)] = k_dict

"""
Create a unique Pipeline name if there is no mlmd file
"""
pipeline_name = "Pipeline"+"-"+str(uuid.uuid4()) if not pipeline_name else pipeline_name


pipeline_name = "Pipeline"+"-"+str(uuid_) if not pipeline_name else pipeline_name
metawriter = cmf.Cmf(filename="mlmd", pipeline_name=pipeline_name, graph=True)

"""
Parse the dvc.lock dictionary and get the command section
"""

for k, v in pipeline_dict.items():
for kk, vv in v.items():
for kkk, vvv in vv.items():
Expand All @@ -150,32 +152,20 @@ def find_location(string, elements):
if the pipeline_dict command is already there in the cmd_exe dict got from parsing the mlmd pop that cmd out
and use the stored lineage from the mlmd
"""
vvv.pop(0)
pos = find_location('--execution_name', vvv)
if pos:
cmd = cmd_exe.get(str(k) + '_' +str(vvv[pos + 1]), None)
execution_name = vvv[pos+1]
else:
cmd = cmd_exe.get(str(k) + '_' +str(uuid_), None)

if cmd is not None:
"""
cmd(lineage) - eg - '1,eval,active_learning '
format - execution_id, context, pipeline
"""
context_name = k
_ = metawriter.create_context(pipeline_stage=context_name)
ingest_metadata(cmd, vv, True, metawriter)
else:
"""
Construct the stage and execution type from the dvc.lock file
lineage eg - execution_file, context, pipeline
"""
context_name = k
execution_name = vvv[-1]
lineage = execution_name+","+context_name+","+ pipeline_name
_ = metawriter.create_context(pipeline_stage=context_name)
if pos:
ingest_metadata(lineage, vv, False, metawriter, str(k) + '_'+str(vvv[pos + 1]))
else:
ingest_metadata(lineage, vv, False, metawriter, str(k) + '_'+str(uuid_))
execution_name = uuid_

context_name = k
lineage = execution_name+","+context_name+","+ pipeline_name

cmd = cmd_exe.get(str(' '.join(vvv)), None)
_ = metawriter.create_context(pipeline_stage=context_name)

ingest_metadata(lineage, vv, metawriter, str(' '.join(vvv)))


metawriter.log_dvc_lock("dvc.lock")
2 changes: 1 addition & 1 deletion cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def create_execution(
self.execution_name,
self.child_context.id,
self.parent_context,
str(sys.argv),
cmd,
self.execution.id,
custom_props,
)
Expand Down

0 comments on commit 3db6310

Please sign in to comment.