diff --git a/app/routes/v2/clusters.js b/app/routes/v2/clusters.js index 26d7a4a81..ba189d114 100644 --- a/app/routes/v2/clusters.js +++ b/app/routes/v2/clusters.js @@ -26,6 +26,7 @@ const axios = require('axios'); var glob = require('glob-promise'); var fs = require('fs'); const mongoSanitize = require('express-mongo-sanitize'); +const pLimit = require('p-limit'); const verifyAdminOrgKey = require('../../utils/orgs.js').verifyAdminOrgKey; const getCluster = require('../../utils/cluster.js').getCluster; @@ -195,193 +196,197 @@ const updateClusterResources = async (req, res, next) => { const Resources = req.db.collection('resources'); const Stats = req.db.collection('resourceStats'); - for (let resource of resources) { - const type = resource['type'] || 'other'; - switch (type.toUpperCase()) { - case 'POLLED': - case 'MODIFIED': - case 'ADDED': { - let beginTime = Date.now(); - const resourceHash = buildHashForResource(resource.object, req.org); - let dataStr = JSON.stringify(resource.object); - let selfLink; - if(resource.object.metadata && resource.object.metadata.annotations && resource.object.metadata.annotations.selfLink){ - selfLink = resource.object.metadata.annotations.selfLink; - } else { - selfLink = resource.object.metadata.selfLink; - } - const key = { - org_id: req.org._id, - cluster_id: req.params.cluster_id, - selfLink: selfLink - }; - let searchableDataObj = buildSearchableDataForResource(req.org, resource.object, { clusterId }); - - if (searchableDataObj.kind == 'RemoteResource' && searchableDataObj.children && searchableDataObj.children.length > 0) { - // if children arrives earlier than this RR without subscription_id, update children's subscription_id - const childSearchKey = { + const limit = pLimit(10); + await Promise.all(resources.map(async (resource) => { + return limit(async () => { + const type = resource['type'] || 'other'; + switch (type.toUpperCase()) { + case 'POLLED': + case 'MODIFIED': + case 'ADDED': { + let beginTime = Date.now(); + const resourceHash = buildHashForResource(resource.object, req.org); + let dataStr = JSON.stringify(resource.object); + let selfLink; + if(resource.object.metadata && resource.object.metadata.annotations && resource.object.metadata.annotations.selfLink){ + selfLink = resource.object.metadata.annotations.selfLink; + } else { + selfLink = resource.object.metadata.selfLink; + } + const key = { org_id: req.org._id, cluster_id: req.params.cluster_id, - selfLink: {$in: searchableDataObj.children}, - 'searchableData.subscription_id': {$exists: false}, + selfLink: selfLink + }; + let searchableDataObj = buildSearchableDataForResource(req.org, resource.object, { clusterId }); + + if (searchableDataObj.kind == 'RemoteResource' && searchableDataObj.children && searchableDataObj.children.length > 0) { + // if children arrives earlier than this RR without subscription_id, update children's subscription_id + const childSearchKey = { + org_id: req.org._id, + cluster_id: req.params.cluster_id, + selfLink: {$in: searchableDataObj.children}, + 'searchableData.subscription_id': {$exists: false}, + deleted: false + }; + let start = Date.now(); + const childResource = await Resources.findOne(childSearchKey); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.childResource', 'data': childSearchKey }, 'satcon-performance'); + if (childResource) { + const subscription_id = searchableDataObj['annotations["deploy_razee_io_clustersubscription"]']; + req.log.debug({key, subscription_id}, `Updating children's subscription_id to ${subscription_id} for parent key.`); + var childStart = Date.now(); + Resources.updateMany( childSearchKey, + {$set: {'searchableData.subscription_id': subscription_id},$currentDate: { updated: true }}, {}); + req.log.info({ 'milliseconds': Date.now() - childStart, 'operation': 'updateClusterResources:Resources.updateMany', 'data': childSearchKey }, 'satcon-performance'); + } + } + const rrSearchKey = { + org_id: req.org._id, + cluster_id: req.params.cluster_id, + 'searchableData.kind': 'RemoteResource', + 'searchableData.children': selfLink, deleted: false }; let start = Date.now(); - const childResource = await Resources.findOne(childSearchKey); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.childResource', 'data': childSearchKey }, 'satcon-performance'); - if (childResource) { - const subscription_id = searchableDataObj['annotations["deploy_razee_io_clustersubscription"]']; - req.log.debug({key, subscription_id}, `Updating children's subscription_id to ${subscription_id} for parent key.`); - var childStart = Date.now(); - Resources.updateMany( childSearchKey, - {$set: {'searchableData.subscription_id': subscription_id},$currentDate: { updated: true }}, {}); - req.log.info({ 'milliseconds': Date.now() - childStart, 'operation': 'updateClusterResources:Resources.updateMany', 'data': childSearchKey }, 'satcon-performance'); + const remoteResource = await Resources.findOne(rrSearchKey); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.remoteResource', 'data': rrSearchKey}, 'satcon-performance'); + if(remoteResource) { + searchableDataObj['subscription_id'] = remoteResource.searchableData['annotations["deploy_razee_io_clustersubscription"]']; + searchableDataObj['searchableExpression'] = searchableDataObj['searchableExpression'] + ':' + searchableDataObj['subscription_id']; } - } - const rrSearchKey = { - org_id: req.org._id, - cluster_id: req.params.cluster_id, - 'searchableData.kind': 'RemoteResource', - 'searchableData.children': selfLink, - deleted: false - }; - let start = Date.now(); - const remoteResource = await Resources.findOne(rrSearchKey); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.remoteResource', 'data': rrSearchKey}, 'satcon-performance'); - if(remoteResource) { - searchableDataObj['subscription_id'] = remoteResource.searchableData['annotations["deploy_razee_io_clustersubscription"]']; - searchableDataObj['searchableExpression'] = searchableDataObj['searchableExpression'] + ':' + searchableDataObj['subscription_id']; - } - const searchableDataHash = buildSearchableDataObjHash(searchableDataObj); - - start = Date.now(); - const currentResource = await Resources.findOne(key); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.currentResource', 'data': key}, 'satcon-performance'); - const hasSearchableDataChanges = (currentResource && searchableDataHash != _.get(currentResource, 'searchableDataHash')); - const pushCmd = buildPushObj(searchableDataObj, _.get(currentResource, 'searchableData', null)); - if (req.s3 && (!currentResource || resourceHash !== currentResource.hash)) { - let start = Date.now(); - dataStr = await pushToS3(req, key, searchableDataHash, dataStr); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:pushToS3', 'data': key}, 'satcon-performance'); - } - var changes = null; - var options = {}; - if(currentResource){ - // if obj already in db - if (resourceHash === currentResource.hash && !hasSearchableDataChanges){ - // if obj in db and nothing has changed - changes = { - $set: { deleted: false }, - $currentDate: { updated: true } - }; + const searchableDataHash = buildSearchableDataObjHash(searchableDataObj); + + start = Date.now(); + const currentResource = await Resources.findOne(key); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.findOne.currentResource', 'data': key}, 'satcon-performance'); + const hasSearchableDataChanges = (currentResource && searchableDataHash != _.get(currentResource, 'searchableDataHash')); + const pushCmd = buildPushObj(searchableDataObj, _.get(currentResource, 'searchableData', null)); + if (req.s3 && (!currentResource || resourceHash !== currentResource.hash)) { + let start = Date.now(); + dataStr = await pushToS3(req, key, searchableDataHash, dataStr); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:pushToS3', 'data': key}, 'satcon-performance'); + } + var changes = null; + var options = {}; + if(currentResource){ + // if obj already in db + if (resourceHash === currentResource.hash && !hasSearchableDataChanges){ + // if obj in db and nothing has changed + changes = { + $set: { deleted: false }, + $currentDate: { updated: true } + }; + } + else{ + const toSet = { deleted: false, hash: resourceHash, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }; + if(hasSearchableDataChanges) { + // if any of the searchable attrs has changes, then save a new yaml history obj (for diffing in the ui) + let start = Date.now(); + const histId = await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, dataStr); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:addResourceYamlHistObj:hasSearchableDataChanges', 'data': clusterId}, 'satcon-performance'); + toSet['histId'] = histId; + } + // if obj in db and theres changes to save + changes = { + $set: toSet, + $currentDate: { updated: true, lastModified: true }, + ...pushCmd + }; + } } else{ - const toSet = { deleted: false, hash: resourceHash, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }; - if(hasSearchableDataChanges) { - // if any of the searchable attrs has changes, then save a new yaml history obj (for diffing in the ui) - let start = Date.now(); - const histId = await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, dataStr); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:addResourceYamlHistObj:hasSearchableDataChanges', 'data': clusterId}, 'satcon-performance'); - toSet['histId'] = histId; + // adds the yaml hist item too + let start = Date.now(); + const histId = await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, dataStr); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:addResourceYamlHistObj:newResource', 'data': clusterId}, 'satcon-performance'); + + // if obj not in db, then adds it + const total = await Resources.count({org_id: req.org._id, deleted: false}); + if (total >= RESOURCE_LIMITS.MAX_TOTAL ) { + res.status(400).send({error: 'Too many resources are registered under this organization.'}); + return; } - // if obj in db and theres changes to save changes = { - $set: toSet, - $currentDate: { updated: true, lastModified: true }, + $set: { deleted: false, hash: resourceHash, histId, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }, + $currentDate: { created: true, updated: true, lastModified: true }, ...pushCmd }; + options = { upsert: true }; + start = Date.now(); + Stats.updateOne({ org_id: req.org._id }, { $inc: { deploymentCount: 1 } }, { upsert: true }); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Stats.updateOne', 'data': req.org._id}, 'satcon-performance'); } + + start = Date.now(); + const result = await Resources.updateOne(key, changes, options); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.updateOne.newResource', 'data': key}, 'satcon-performance'); + // publish notification to graphql + if (result) { + let resourceId = null; + let resourceCreated = Date.now; + if (result.upsertedId) { + resourceId = result.upsertedId._id; + } else if (currentResource) { + resourceId = currentResource._id; + resourceCreated = currentResource.created; + } + if (resourceId) { + pubSub.resourceChangedFunc( + {_id: resourceId, data: dataStr, created: resourceCreated, + deleted: false, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, + hash: resourceHash, searchableData: searchableDataObj, searchableDataHash: searchableDataHash}); + } + } + req.log.info({ 'milliseconds': Date.now() - beginTime, 'operation': 'updateClusterResources', 'data': 'POLLED,MODIFIED,ADDED' }, 'satcon-performance'); + break; } - else{ - // adds the yaml hist item too - let start = Date.now(); - const histId = await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, dataStr); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:addResourceYamlHistObj:newResource', 'data': clusterId}, 'satcon-performance'); - - // if obj not in db, then adds it - const total = await Resources.count({org_id: req.org._id, deleted: false}); - if (total >= RESOURCE_LIMITS.MAX_TOTAL ) { - res.status(400).send({error: 'Too many resources are registered under this organization.'}); - return; + case 'DELETED': { + let beginTime = Date.now(); + let selfLink; + if(resource.object.metadata && resource.object.metadata.annotations && resource.object.metadata.annotations.selfLink){ + selfLink = resource.object.metadata.annotations.selfLink; + } else { + selfLink = resource.object.metadata.selfLink; } - changes = { - $set: { deleted: false, hash: resourceHash, histId, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }, - $currentDate: { created: true, updated: true, lastModified: true }, - ...pushCmd + let dataStr = JSON.stringify(resource.object); + const key = { + org_id: req.org._id, + cluster_id: req.params.cluster_id, + selfLink: selfLink }; - options = { upsert: true }; - start = Date.now(); - Stats.updateOne({ org_id: req.org._id }, { $inc: { deploymentCount: 1 } }, { upsert: true }); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Stats.updateOne', 'data': req.org._id}, 'satcon-performance'); - } - - start = Date.now(); - const result = await Resources.updateOne(key, changes, options); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.updateOne.newResource', 'data': key}, 'satcon-performance'); - // publish notification to graphql - if (result) { - let resourceId = null; - let resourceCreated = Date.now; - if (result.upsertedId) { - resourceId = result.upsertedId._id; - } else if (currentResource) { - resourceId = currentResource._id; - resourceCreated = currentResource.created; + const searchableDataObj = buildSearchableDataForResource(req.org, resource.object, { clusterId }); + const searchableDataHash = buildSearchableDataObjHash(searchableDataObj); + const currentResource = await Resources.findOne(key); + const pushCmd = buildPushObj(searchableDataObj, _.get(currentResource, 'searchableData', null)); + if (req.s3) { + let start = Date.now(); + dataStr = await pushToS3(req, key, searchableDataHash, dataStr); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:pushToS3:Deleted', 'data': key}, 'satcon-performance'); } - if (resourceId) { - pubSub.resourceChangedFunc( - {_id: resourceId, data: dataStr, created: resourceCreated, - deleted: false, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, - hash: resourceHash, searchableData: searchableDataObj, searchableDataHash: searchableDataHash}); + if (currentResource) { + let start = Date.now(); + await Resources.updateOne( + key, { + $set: { deleted: true, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }, + $currentDate: { updated: true }, + ...pushCmd + } + ); + req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.updateOne.Deleted:', 'data': key}, 'satcon-performance'); + await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, ''); + pubSub.resourceChangedFunc({ _id: currentResource._id, created: currentResource.created, deleted: true, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, searchableData: searchableDataObj, searchableDataHash: searchableDataHash}); } + req.log.info({ 'milliseconds': Date.now() - beginTime, 'operation': 'updateClusterResources', 'data': 'DELETED' }, 'satcon-performance'); + break; } - req.log.info({ 'milliseconds': Date.now() - beginTime, 'operation': 'updateClusterResources', 'data': 'POLLED,MODIFIED,ADDED' }, 'satcon-performance'); - break; - } - case 'DELETED': { - let beginTime = Date.now(); - let selfLink; - if(resource.object.metadata && resource.object.metadata.annotations && resource.object.metadata.annotations.selfLink){ - selfLink = resource.object.metadata.annotations.selfLink; - } else { - selfLink = resource.object.metadata.selfLink; - } - let dataStr = JSON.stringify(resource.object); - const key = { - org_id: req.org._id, - cluster_id: req.params.cluster_id, - selfLink: selfLink - }; - const searchableDataObj = buildSearchableDataForResource(req.org, resource.object, { clusterId }); - const searchableDataHash = buildSearchableDataObjHash(searchableDataObj); - const currentResource = await Resources.findOne(key); - const pushCmd = buildPushObj(searchableDataObj, _.get(currentResource, 'searchableData', null)); - if (req.s3) { - let start = Date.now(); - dataStr = await pushToS3(req, key, searchableDataHash, dataStr); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:pushToS3:Deleted', 'data': key}, 'satcon-performance'); - } - if (currentResource) { - let start = Date.now(); - await Resources.updateOne( - key, { - $set: { deleted: true, data: dataStr, searchableData: searchableDataObj, searchableDataHash: searchableDataHash }, - $currentDate: { updated: true }, - ...pushCmd - } - ); - req.log.info({ 'milliseconds': Date.now() - start, 'operation': 'updateClusterResources:Resources.updateOne.Deleted:', 'data': key}, 'satcon-performance'); - await addResourceYamlHistObj(req, req.org._id, clusterId, selfLink, ''); - pubSub.resourceChangedFunc({ _id: currentResource._id, created: currentResource.created, deleted: true, org_id: req.org._id, cluster_id: req.params.cluster_id, selfLink: selfLink, searchableData: searchableDataObj, searchableDataHash: searchableDataHash}); + default: { + throw new Error(`Unsupported event ${resource.type}`); } - req.log.info({ 'milliseconds': Date.now() - beginTime, 'operation': 'updateClusterResources', 'data': 'DELETED' }, 'satcon-performance'); - break; - } - default: { - throw new Error(`Unsupported event ${resource.type}`); } - } - } + }); + })); + res.status(200).send('Thanks'); } catch (err) { req.log.error(err.message);