diff --git a/cmflib/bin/cmf_dvc_ingest.py b/cmflib/bin/cmf_dvc_ingest.py index 66210fac..1a733145 100644 --- a/cmflib/bin/cmf_dvc_ingest.py +++ b/cmflib/bin/cmf_dvc_ingest.py @@ -47,14 +47,16 @@ def get_cmf_hierarchy(execution_lineage:str): execution_exist : True if it exeist, False otherwise metawrite: cmf object """ -def ingest_metadata(execution_lineage:str, metadata:dict, execution_exist:bool, metawriter:cmf.Cmf, command:str = "") : +def ingest_metadata(execution_lineage:str, metadata:dict, metawriter:cmf.Cmf, command:str = "") : pipeline_name, context_name, execution = get_cmf_hierarchy(execution_lineage) - - if execution_exist: - _ = metawriter.update_execution(int(execution)) - else: - _ = metawriter.create_execution(str(context_name) + '_' + str(uuid_), {}, command) + _ = metawriter.create_execution( + str(context_name) + '_' + str(execution), + {}, + cmd = str(command), + create_new_execution=False + ) + for k, v in metadata.items(): if k == "deps": for dep in v: @@ -108,7 +110,7 @@ def find_location(string, elements): outs = [] k_dict = {} i = 0 - + for kk in valuesYaml['stages'][k]: if kk == 'cmd': cmd_list = valuesYaml['stages'][k][kk].split() @@ -123,18 +125,18 @@ def find_location(string, elements): k_dict['outs'] = outs pipeline_dict[k][str(i)] = k_dict + """ Create a unique Pipeline name if there is no mlmd file """ -pipeline_name = "Pipeline"+"-"+str(uuid.uuid4()) if not pipeline_name else pipeline_name +pipeline_name = "Pipeline"+"-"+str(uuid_) if not pipeline_name else pipeline_name metawriter = cmf.Cmf(filename="mlmd", pipeline_name=pipeline_name, graph=True) """ Parse the dvc.lock dictionary and get the command section """ - for k, v in pipeline_dict.items(): for kk, vv in v.items(): for kkk, vvv in vv.items(): @@ -150,32 +152,20 @@ def find_location(string, elements): if the pipeline_dict command is already there in the cmd_exe dict got from parsing the mlmd pop that cmd out and use the stored lineage from the mlmd """ + vvv.pop(0) pos = find_location('--execution_name', vvv) if pos: - cmd = cmd_exe.get(str(k) + '_' +str(vvv[pos + 1]), None) + execution_name = vvv[pos+1] else: - cmd = cmd_exe.get(str(k) + '_' +str(uuid_), None) - - if cmd is not None: - """ - cmd(lineage) - eg - '1,eval,active_learning ' - format - execution_id, context, pipeline - """ - context_name = k - _ = metawriter.create_context(pipeline_stage=context_name) - ingest_metadata(cmd, vv, True, metawriter) - else: - """ - Construct the stage and execution type from the dvc.lock file - lineage eg - execution_file, context, pipeline - """ - context_name = k - execution_name = vvv[-1] - lineage = execution_name+","+context_name+","+ pipeline_name - _ = metawriter.create_context(pipeline_stage=context_name) - if pos: - ingest_metadata(lineage, vv, False, metawriter, str(k) + '_'+str(vvv[pos + 1])) - else: - ingest_metadata(lineage, vv, False, metawriter, str(k) + '_'+str(uuid_)) + execution_name = uuid_ + + context_name = k + lineage = execution_name+","+context_name+","+ pipeline_name + + cmd = cmd_exe.get(str(' '.join(vvv)), None) + _ = metawriter.create_context(pipeline_stage=context_name) + + ingest_metadata(lineage, vv, metawriter, str(' '.join(vvv))) + metawriter.log_dvc_lock("dvc.lock") diff --git a/cmflib/cmf.py b/cmflib/cmf.py index 7528fc94..975a136e 100644 --- a/cmflib/cmf.py +++ b/cmflib/cmf.py @@ -391,7 +391,7 @@ def create_execution( self.execution_name, self.child_context.id, self.parent_context, - str(sys.argv), + cmd, self.execution.id, custom_props, )