Skip to content

Commit

Permalink
add secondary connector et value mapping consideration
Browse files Browse the repository at this point in the history
  • Loading branch information
simon louvet committed Mar 28, 2024
1 parent 8cde245 commit 1a6793b
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 113 deletions.
9 changes: 7 additions & 2 deletions core/lib/workspace_lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,7 @@ function _get_workspace(workspace_id) {
.lean()
.exec()
.then(async (workspaceIn) => {
// console.log('workspaceIn',workspaceIn);
if (workspaceIn == null) {
return reject(new Error.EntityNotFoundError('workspaceModel'))
}
Expand Down Expand Up @@ -1028,18 +1029,22 @@ function _get_workspace_graph_data(workspaceId) {

// --------------------------------------------------------------------------------

function _addConnection(workspaceId, source, target) {
function _addConnection(workspaceId, source, target, input) {

return new Promise((resolve, reject) => {
workspaceModel.getInstance().model.findOne({
_id: workspaceId
}).then(workspace => {
if (workspace == null) {
return reject(new Error.EntityNotFoundError('workspaceModel'))
}
console.log('input',input)
workspace.links.push({
source: source,
target: target
target: target,
targetInput : input
});
console.log('workspace add connection',workspace)
return workspaceModel.getInstance().model.findOneAndUpdate({
_id: workspace._id
},
Expand Down
3 changes: 2 additions & 1 deletion core/model_schemas/workspace_schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var WorkspaceSchema = mongoose.Schema({
},
links: [{
source: String,
target: String
target: String,
targetInput: String
}],
users: [{
email: String,
Expand Down
67 changes: 25 additions & 42 deletions engine/services/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ class Engine {
queryParams: this.originQueryParams
},
undefined
)
);
// console.log('this.pathResolution',this.pathResolution.nodes)
this.pathResolution.nodes.forEach(n=>{
console.log(n)
})

if (this.config.quietLog != true) {
console.log(' ---------- BuildPath Links-----------', this.fackCounter, this.workflow.name)
Expand Down Expand Up @@ -253,7 +257,6 @@ class Engine {
let secondaryFlow
let componentFlow={}
if (nodesProcessingInputs.length > 0) {

let persistedDataFlow = [];
for (const sourceNode of nodesProcessingInputs) {
// console.log('sourceNode',sourceNode);
Expand All @@ -280,11 +283,13 @@ class Engine {

// }
const previousDfob = persistedDataFlowCoponent.dfob ? persistedDataFlowCoponent.dfob : undefined;
const targetInput = sourceNode.targets.find(t=>t.target.component._id==processingNode.component._id).targetInput;
persistedDataFlow.push({
// data: persistedData ? persistedData : undefined,
fragment : persistedFragmentData,
componentId: sourceNode.component._id,
dfob: previousDfob,
targetInput: targetInput,
dfob: previousDfob
})
}

Expand All @@ -293,14 +298,14 @@ class Engine {
deeperFocusData : processingNode.component.deeperFocusData
}

// legacy compatibility with dfob as component
if(componentFlow.dataFlow.length==1 && (!componentFlow.deeperFocusData || Object.keys(componentFlow.deeperFocusData).length==0)){
// console.log('APPPLY dfob from source!!',componentFlow.dataFlow[0].dfob)
componentFlow.deeperFocusData=componentFlow.dataFlow[0].dfob;
}

// console.log('componentFlow?.deeperFocusData',componentFlow?.deeperFocusData)
// default DFOB
if((!componentFlow?.deeperFocusData ||Object.keys(componentFlow.deeperFocusData).length==0)&& (componentFlow?.deeperFocusData?.activateDf==undefined||componentFlow?.deeperFocusData?.activateDf==false)){
// console.log('DEFAULT DFOB!!')
componentFlow.deeperFocusData={
dfobPath:'',
keepArray:true
Expand All @@ -320,7 +325,7 @@ class Engine {
componentFlow.dataFlow
)
} else {
componentFlow.primaryflow = componentFlow.dataFlow[0]
componentFlow.primaryflow = componentFlow.dataFlow.filter(df=>df.targetInput==undefined)[0]
}

secondaryFlow = []
Expand Down Expand Up @@ -406,7 +411,8 @@ class Engine {
for (let sf of componentFlow.secondaryFlow){
secondaryFlowDefraged.push({
data : await this.fragment_lib.getWithResolutionByBranch(sf.fragment),
componentId : sf.componentId
componentId : sf.componentId,
targetInput : sf.targetInput
})
}
if (this.config.quietLog != true) console.timeEnd("secondary_getWithResolutionByBranch");
Expand Down Expand Up @@ -761,17 +767,17 @@ class Engine {
let module = this.technicalComponentDirectory[processingNode.component.module]
const {dfobTable,pipeNb, keepArray}=dfob
let rebuildData;
// console.log('___________rebuildFrag_focus_work_persist',dfob,fragment)


try {
// if (this.config.quietLog != true) console.time("primary_getWithResolutionByBranch");
// console.log('__________rebuild ',fragment)

rebuildData = await this.fragment_lib.getWithResolutionByBranch(fragment._id);
dfob=undefined;
// if (this.config.quietLog != true) console.timeEnd("primary_getWithResolutionByBranch");
// console.log('_______rebuildData',rebuildData)

// console.log('___fragment',fragment)


const needDfob = dfobTable.length>0 || (Array.isArray(rebuildData)&&!keepArray&&!fragment.branchOriginFrag);
if(needDfob){
// console.log('WITH DFOB',dfobTable);
Expand All @@ -782,18 +788,14 @@ class Engine {
undefined,
keepArray
)
// console.log('___________dfobFlow',JSON.stringify(dfobFlow))
let paramArray = dfobFlow.map(item => {
// console.log('__________ item :',item)
var recomposedFlow = [];
// console.log(finalItem.objectToProcess,finalItem.key);

recomposedFlow = recomposedFlow.concat([{
data: item.key != undefined ? item.objectToProcess[item.key] : item.objectToProcess,
componentId: primaryflow.componentId
}]);
recomposedFlow = recomposedFlow.concat(secondaryFlow);
// console.log('recomposedFlow',recomposedFlow);

return [
processingNode.component,
Expand All @@ -802,9 +804,6 @@ class Engine {
]
});
// if (this.config.quietLog != true) console.timeEnd("build-DfobFlow");

// console.log('__________ paramArray :',paramArray[0][1])
// console.log('__________module',module);
// if (this.config.quietLog != true) console.time("work");
const componentFlowDfob = await this.promiseOrchestrator.execute(module, module.pull, paramArray, {
beamNb: pipeNb,
Expand All @@ -819,16 +818,11 @@ class Engine {
}
}, this.config);
// if (this.config.quietLog != true) console.timeEnd("work");

// console.log('__________ componentFlowDfob :',componentFlowDfob)

// if (this.config.quietLog != true) console.time("recompose-DfobFlow");
// console.log('dfobFlow 1',dfobFlow)

for (var componentFlowDfobKey in componentFlowDfob) {
if ('data' in componentFlowDfob[componentFlowDfobKey]) {
// console.log('componentFlowDfobKey 1',componentFlowDfobKey)
if (dfobFlow[componentFlowDfobKey].key != undefined) {
// console.log('componentFlowDfobKey 2',componentFlowDfobKey)
dfobFlow[componentFlowDfobKey].objectToProcess[dfobFlow[componentFlowDfobKey].key] =
componentFlowDfob[componentFlowDfobKey].data
} else {
Expand All @@ -839,32 +833,26 @@ class Engine {
for (let key of Object.keys(componentFlowDfob[componentFlowDfobKey].data)) {
dfobFlow[componentFlowDfobKey].objectToProcess[key] = componentFlowDfob[componentFlowDfobKey].data[key];
}
// dfobFinalFlow[componentFlowDfobKey].objectToProcess=componentFlowDfob[componentFlowDfobKey].data;
}
} else if (componentFlowDfob[componentFlowDfobKey].error != undefined) {
dfobFlow[componentFlowDfobKey].objectToProcess[dfobFlow[componentFlowDfobKey].key] =
componentFlowDfob[componentFlowDfobKey]
}
}
// console.log('_________ dfobFlow 2',dfobFlow)
// if (this.config.quietLog != true) console.timeEnd("recompose-DfobFlow");
} else {
// console.log('WITHOUT DFOB');
let workResult
let recomposedFlow = [];
// console.log(finalItem.objectToProcess,finalItem.key);

recomposedFlow = recomposedFlow.concat([{
data: rebuildData,
componentId: primaryflow.componentId
}]);
recomposedFlow = recomposedFlow.concat(secondaryFlow);
// console.log('recomposedFlow',recomposedFlow);
// console.log('processingNode.component',processingNode.component);

// if (this.config.quietLog != true) console.time("work");
workResult = await module.pull(processingNode.component, recomposedFlow, processingNode.queryParams == undefined ? undefined : processingNode.queryParams.queryParams)
// if (this.config.quietLog != true) console.timeEnd("work");
// console.log('workResult',workResult);
rebuildData=workResult.data;
dfob = workResult.dfob
}
Expand All @@ -875,11 +863,6 @@ class Engine {
};
}



// console.log('______ rebuildData', rebuildData);
// console.log('_________fragment',fragment);

let pesristedFragment
try {
// console.log('BEFORE persist',rebuildData,fragment)
Expand All @@ -891,7 +874,6 @@ class Engine {
} catch (error) {
console.error("persist ERROR",error);
}
// console.log('______ pesristedFragment',pesristedFragment);
return {
frag : pesristedFragment, // not needed because frag of component ever known by main execution
dfob
Expand Down Expand Up @@ -992,7 +974,8 @@ class Engine {
target: buildPathNode,
requestDirection: 'pull',
queryParams: queryParams,
linkId: beforelink._id
linkId: beforelink._id,
targetInput: beforelink.targetInput
}
// linkToProcess.status='waiting';
buildPath.links.push(linkToProcess)
Expand Down Expand Up @@ -1046,13 +1029,13 @@ class Engine {
source: buildPathNode,
requestDirection: 'push',
queryParams: queryParams,
linkId: afterlink._id
linkId: afterlink._id,
targetInput: afterlink.targetInput
}
// linkToProcess.status='waiting';

buildPath.links.push(linkToProcess)
buildPathNode.targets.push(linkToProcess)
// console.log(linkToProcess);
// buildPath.push(out);

this.buildPathResolution(
workspace,
afterComponentObject,
Expand Down
24 changes: 18 additions & 6 deletions engine/workspaceComponentExecutor/valueMapping.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class ValueMapping {
* @param {SpecificData} specificData
* @return {Array<MapValueResult>}
*/
mapValue(valueIn, specificData) {
mapValue(valueIn, specificData,secondaryFlow) {
try {
let valueInString = valueIn.toString();
if (specificData.ignoreCase == true) {
Expand All @@ -20,7 +20,17 @@ class ValueMapping {
if (specificData.ignoreAccent == true) {
valueInString = this.normalize(valueInString);
}
return this.arrays.flatMap(specificData.mappingTable, atomicMapping => {
let mappingTable;
if(secondaryFlow){
mappingTable = secondaryFlow.map(sf=>({
flowValue : sf.in,
replacementValue: sf.out
}))
}else{
mappingTable=specificData.mappingTable;
}
// console.log(mappingTable)
return this.arrays.flatMap(mappingTable, atomicMapping => {
let flowValue = atomicMapping.flowValue;
if (specificData.ignoreCase == true) {
flowValue = flowValue.toUpperCase();
Expand Down Expand Up @@ -65,7 +75,7 @@ class ValueMapping {
* @param {SpecificData} specificData
* @return {MapValuesResult}
*/
mapValues(source, specificData) {
mapValues(source, specificData,secondaryFlow) {

if (source === undefined || source === null) {
return {
Expand All @@ -75,11 +85,11 @@ class ValueMapping {
}
} else if (Array.isArray(source)) {
return {
data: this.arrays.flatMap(source, valueIn => this.mapValue(valueIn, specificData))
data: this.arrays.flatMap(source, valueIn => this.mapValue(valueIn, specificData,secondaryFlow))
}
} else {

const result = this.mapValue(source, specificData);
const result = this.mapValue(source, specificData,secondaryFlow);
// console.log('result',result);
return {
data: result
Expand All @@ -88,7 +98,9 @@ class ValueMapping {
}

pull(data, flowData) {
return Promise.resolve(this.mapValues(flowData[0].data, data.specificData))
const primaryFlow = flowData.find(f=>f.targetInput==undefined)?.data;
const secondaryFlow = flowData.find(f=>f.targetInput=='second')?.data;
return Promise.resolve(this.mapValues(primaryFlow, data.specificData,secondaryFlow))
}
}

Expand Down
Loading

0 comments on commit 1a6793b

Please sign in to comment.