Skip to content

Commit

Permalink
pending api call
Browse files Browse the repository at this point in the history
  • Loading branch information
simon louvet committed Apr 10, 2024
1 parent 33be484 commit 7a127ce
Show file tree
Hide file tree
Showing 3 changed files with 346 additions and 380 deletions.
6 changes: 3 additions & 3 deletions main/server/services/technicalComponentDirectory.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ module.exports = {

initialise: function (router, unSafeRouteur,engineTracer) {
// console.log('initialise')
this.restApiPost.initialise(unSafeRouteur,engineTracer) // NO SECURE CHANGE ROUTER
this.restApiGet.initialise(unSafeRouteur,engineTracer) // NO SECURE CHANGE ROUTER
// this.restApiPost.initialise(unSafeRouteur,engineTracer) // NO SECURE CHANGE ROUTER
// this.restApiGet.initialise(unSafeRouteur,engineTracer) // NO SECURE CHANGE ROUTER
this.httpProvider.initialise(unSafeRouteur,engineTracer)
this.upload.initialise(router,engineTracer)
this.cacheNosql.initialise(router,engineTracer) // NO SECURE CHANGE ROUTER
},

setAmqp : function (channel){
// console.log('setAmqp')
this.restApiPost.setAmqp(channel);
this.httpProvider.setAmqp(channel);
this.upload.setAmqp(channel)
}
}
202 changes: 84 additions & 118 deletions main/server/workspaceComponentInitialize/httpProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,30 @@ class HttpProvider {
} = require("path-to-regexp");
this.pathToRegexp = pathToRegexp;
this.pendingWork= {};
this.pendingCall= {};
this.currentCall= {};
this.amqpConnection;
}

setAmqp(amqpConnection){
// console.log('set AMQP')
this.amqpConnection=amqpConnection;
amqpConnection.consume('process-persist', (msg) => {
const messageObject = JSON.parse(msg.content.toString())
const pendingWork = this.pendingWork[messageObject.tracerId||messageObject.processId]
if(pendingWork?.component == messageObject.componentId){
pendingWork.frag = messageObject.frag;
amqpConnection.consume('process-persist', async (msg) => {
const messageObject = JSON.parse(msg.content.toString());
const tracerId = messageObject.tracerId||messageObject.processId;
const pendingWork = this.pendingWork[tracerId];
const triggerComponentId= pendingWork?.component?.specificData?.responseComponentId||pendingWork?.component._id;
if(triggerComponentId == messageObject.componentId){
// pendingWork.frag = messageObject.frag;
const dataResponse = await this.fragment_lib.getWithResolutionByBranch(messageObject.frag);
this.sendResult(pendingWork?.component, dataResponse, pendingWork.res);
// console.log('->undefined')
delete this.pendingWork[tracerId];
delete this.currentCall[pendingWork.component._id.toString()];
this.pop(pendingWork.component._id.toString());
}


}, {
noAck: true
})
Expand All @@ -56,17 +67,26 @@ class HttpProvider {
const pendingWork = this.pendingWork[messageObject.tracerId||messageObject._id]
if(pendingWork){
pendingWork.process = messageObject._id;

}
}, {
noAck: true
})

amqpConnection.consume('process-error', (msg) => {
const messageObject = JSON.parse(msg.content.toString())
const pendingWork = this.pendingWork[messageObject.tracerId||messageObject._id]
const tracerId=messageObject.tracerId||messageObject._id;
const pendingWork = this.pendingWork[tracerId]
if(pendingWork){
pendingWork.error = messageObject._id;
pendingWork.res.status(500).send({
error:'engine error'
})
delete this.pendingWork[tracerId];
delete this.currentCall[pendingWork.component._id.toString()];
this.pop(pendingWork.component._id.toString());
}

}, {
noAck: true
})
Expand All @@ -75,8 +95,8 @@ class HttpProvider {

initialise(router,engineTracer) {

router.all('*', async (req, res, next) => {

router.all('*', async (req, res, next) => {
// console.log('pendingWork',this.pendingWork);
// console.log(req)
const urlRequiered = req.params[0].split('/')[1];
Expand All @@ -95,6 +115,7 @@ class HttpProvider {
req.setTimeout(0);
let keys = []
let regexp = this.pathToRegexp(component.specificData.url, keys);

//convert query url variable to query properties
if (regexp.test(urlRequieredFull)) {
let values = regexp.exec(urlRequieredFull);
Expand All @@ -115,129 +136,74 @@ class HttpProvider {
// console.log('NO MATH!!');
}

// console.log('req.body',req.body);

const worksapce = await this.workspace_lib.get_workspace_simple(component.workspaceId)

const version = worksapce.engineVersion==undefined||worksapce.engineVersion=='default'?'v1':worksapce.engineVersion;
const callStack=this.pendingCall[component._id];
const callContent = {
queryParams: {
query: req.query,
body: req.body,
headers: req.headers,
method :req.method
},
component : component,
res:res
}

// console.log('VERSION',version)
if (MODE=='HTTP'){
// console.log('CALL Direct HTTP')
const versionUrl = `${this.config.engineUrl}/${version}/work-ask/${component._id}`
// console.log('versionUrl',this.config.engineUrl + versionUrl + component._id);
this.request.post(versionUrl, {
body: {
queryParams: {
query: req.query,
body: req.body,
headers: req.headers,
method :req.method
},
pushData: req.body
},
json: true
}
// eslint-disable-next-line handle-callback-err
, (err, data) => {
if (Array.isArray(callStack)){
callStack.push(callContent);
} else{
this.pendingCall[component._id]=[callContent]
}
this.pop(component._id.toString());

try {
if (err) {
console.error("restpiIPost request error", err);
res.status(500).send(err)
} else {
if (data.statusCode != 200) {
res.status(500).send({
engineResponse: data.body
})
} else {
if(data.body.data){
this.sendResult(component, data.body.data, res)
}else {
// engineTracer.pendingProcess.push(data.body.processId);
this.pendingWork[data.body.processId]={component :component._id};
let counter=0
const intervalId = setInterval(async () => {
if (this.pendingWork[data.body.processId].frag){
clearInterval(intervalId);
// res.send(this.pendingWork[data.body.processId]);
const dataResponse = await this.fragment_lib.getWithResolutionByBranch(this.pendingWork[data.body.processId].frag);
this.sendResult(component, dataResponse, res)
}else{
// console.log('waiting');
}
}, 100);
}
}
}
} catch (e) {
console.log('api error after engine call', e);
res.send(new Error(e.message))
}
});
}else if (MODE=='AMQP'){
// console.log('CALL AMQP')
const tracerId = uuidv4();
const workParams={
tracerId ,
id : component._id,
queryParams: {
query: req.query,
body: req.body,
headers: req.headers,
method :req.method
},
// pushData: req.body
}
this.pendingWork[tracerId] = {
component :component._id
}
// console.log(this.amqpConnection)
this.amqpConnection.sendToQueue(
'work-ask',
Buffer.from(JSON.stringify(workParams)),
null,

(err, ok) => {
if (err !== null) {
console.error('Erreur lors de l\'envoi du message :', err);
res.status(500).send({
error: 'AMQP server no connected'
})
} else {
// console.log(`Message envoyé à la file `);
// res.send(workParams);
}
}
)
// let counter=1;
const intervalId = setInterval(async () => {
if (this.pendingWork[tracerId].frag){
clearInterval(intervalId);
const dataResponse = await this.fragment_lib.getWithResolutionByBranch(this.pendingWork[tracerId].frag);
this.sendResult(component, dataResponse, res)

} else if (this.pendingWork[tracerId].error){
clearInterval(intervalId);
res.status(500).send({
error:'engine error'
})
}else{
// console.log('waiting');
// counter++;
}
}, 100);
}
} else {
res.status(404).send('no API for this url');
}
} catch (e) {
console.log(e);
res.status(404).send('no API for this url');
res.status(500).send('API error');
}
})
}

pop(componentId){
const callStack = this.pendingCall[componentId];
// console.log(callStack.length);
if(!this.currentCall[componentId] && Array.isArray(callStack) && callStack.length>0){
this.currentCall[componentId]=true;
const currentCallItem = callStack.shift();
const tracerId = uuidv4();
const workParams={
tracerId ,
id : currentCallItem.component._id,
queryParams: currentCallItem.queryParams
}
this.pendingWork[tracerId] = {
component :currentCallItem.component,
res : currentCallItem.res
}

this.amqpConnection.sendToQueue(
'work-ask',
Buffer.from(JSON.stringify(workParams)),
null,
(err, ok) => {
if (err !== null) {
console.error('Erreur lors de l\'envoi du message :', err);
res.status(500).send({
error: 'AMQP server no connected'
})
} else {
// console.log(`Message envoyé à la file `);
// res.send(workParams);
}
}
)
}


}

sendResult(component, dataToSend, res) {
if (component.specificData != undefined) { // exception in previous promise
if (component.specificData.contentType != undefined && component.specificData.contentType != '') {
Expand Down
Loading

0 comments on commit 7a127ce

Please sign in to comment.