diff --git a/.github/workflows/syntax-validation-test.yaml b/.github/workflows/syntax-validation-test.yaml new file mode 100644 index 00000000..bce82a9f --- /dev/null +++ b/.github/workflows/syntax-validation-test.yaml @@ -0,0 +1,30 @@ +name: "Syntax validation tests" +on: [workflow_dispatch, pull_request] + +jobs: + package-version-test: + runs-on: ubuntu-latest + strategy: + matrix: + include: # Includes one more job + - dir: 'BlockBlobReader/target/consumer_build' + - dir: 'BlockBlobReader/target/dlqprocessor_build' + - dir: 'BlockBlobReader/target/producer_build' + - dir: 'AppendBlobReader/target/producer_build' + - dir: 'AppendBlobReader/target/appendblob_producer_build' + - dir: 'AppendBlobReader/target/consumer_build' + - dir: 'AppendBlobReader/target/consumer_build' + - dir: 'AppendBlobReader/target/consumer_build' + - dir: 'EventHubs/target/metrics_build' + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup node + uses: actions/setup-node@v4 + with: + node-version: 18 + + - working-directory: ${{ matrix.dir }} + run: | + find ./ -name *.js | xargs node -c diff --git a/AppendBlobReader/src/appendblobproducer.js b/AppendBlobReader/src/appendblobproducer.js index eae7affa..5554df18 100644 --- a/AppendBlobReader/src/appendblobproducer.js +++ b/AppendBlobReader/src/appendblobproducer.js @@ -134,10 +134,10 @@ function queryFiles(tableQuery, context) { allentities.push(entity); } - resolve(allentities); + return resolve(allentities); } catch (error) { context.log.error(`Error while fetching queryFiles: ${JSON.stringify(error)}`); - reject(error); + return reject(error); } }) } @@ -199,16 +199,20 @@ function batchUpdateOffsetTable(context, allentities, mode) { * and thus there is a chance that the file may get locked for a long time so the below function automatically * releases the lock after a threshold is breached. */ -function getLockedEntitiesExceedingThreshold(context) { +function getLockedEntitiesExceedingThreshold(context, maxQueueingDelay) { - var maxlockThresholdMin = 15; + var maxlockThresholdMin = 30; + if (maxQueueingDelay > 30) { + context.log("WARNING maxQueueingDelay exceeding 30 minutes"); + maxlockThresholdMin = maxQueueingDelay; + } var dateVal = new Date(); dateVal.setMinutes(Math.max(0, dateVal.getMinutes() - maxlockThresholdMin)); var lockedFileQuery = `done eq ${true} and blobType eq '${'AppendBlob'}' and offset ge ${0} and lastEnqueLockTime le datetime'${dateVal.toISOString()}'` return queryFiles(lockedFileQuery, context).then(function (allentities) { - context.log("AppendBlob Locked Files exceeding maxlockThresholdMin: " + allentities.length); + context.log(`AppendBlob Locked Files exceeding maxlockThresholdMin of ${maxlockThresholdMin}: ${allentities.length}`); var unlockedEntities = allentities.map(function (entity) { - context.log("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime); + context.log.verbose("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime); return getunLockedEntity(entity); }); return unlockedEntities; @@ -278,16 +282,35 @@ function setBatchSizePerStorageAccount(newFiletasks) { filesPerStorageAccountCount[task.storageName] += 1; } }; - let MAX_READ_API_LIMIT_PER_SEC = 15000; - let MAX_GET_BLOB_REQUEST_PER_INVOKE = 25; + let MAX_READ_API_LIMIT_PER_SEC = 20000; // storage account read api limit + let MAX_GET_BLOB_REQUEST_PER_INVOKE = 50; // 50*4MB = 200MB max batch size size for (let idx = 0; idx < newFiletasks.length; idx += 1) { task = newFiletasks[idx]; - let apiCallPerFile = Math.max(1, Math.floor(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName])); - task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 12 * 1024 * 1024; + let apiCallPerFile = Math.max(1, Math.ceil(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName])); + task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 4 * 1024 * 1024; // single request fetches 4MB } return newFiletasks; } +/** + * + * + */ +function getDateDifferenceInMinutes(date_a, date_b) { + + try { + if (!(date_a && date_b)) { + return null; + } + var dateVal_a = new Date(date_a); + var dateVal_b = new Date(date_b); + var diffMs = (dateVal_b - dateVal_a); + var diffMins = Math.round(((diffMs % 86400000) % 3600000) / 60000); + return diffMins; + } catch { + return null; + } +} /** *First it fetches the unlocked append blob files rows and creates tasks for them in Event Hub * @@ -303,11 +326,13 @@ function getTasksForUnlockedFiles(context) { // fetching unlocked files which were not enqueued in last 5 minutes var existingFileQuery = `done eq ${false} and blobType eq '${'AppendBlob'}' and offset ge ${0} and ( not (lastEnqueLockTime lt '') or lastEnqueLockTime le datetime'${dateVal.toISOString()}')` return new Promise(function (resolve, reject) { - queryFiles(existingFileQuery, context).then(function (allentities) { + return queryFiles(existingFileQuery, context).then(function (allentities) { var newFiletasks = []; var archivedFiles = []; var newFileEntities = []; var lockedEntities = []; + var maxQueueingDelay = 0; + let currentQueueingDelay = null; allentities.forEach(function (entity) { if (isAppendBlobArchived(context, entity)) { archivedFiles.push(getunLockedEntity(entity)); @@ -320,6 +345,9 @@ function getTasksForUnlockedFiles(context) { } context.log.verbose("Creating task for file: " + entity.rowKey); } + + maxQueueingDelay = Math.max(maxQueueingDelay, getDateDifferenceInMinutes(entity.lastEnqueLockTime, entity.updatedate)); + }); newFileEntities = getFixedNumberOfEntitiesbyEnqueTime(context, newFileEntities) newFileEntities.forEach(function (entity) { @@ -328,10 +356,9 @@ function getTasksForUnlockedFiles(context) { }); newFiletasks = setBatchSizePerStorageAccount(newFiletasks) context.log("New File Tasks created: " + newFiletasks.length + " AppendBlob Archived Files: " + archivedFiles.length); - resolve([newFiletasks, archivedFiles, lockedEntities]); + return resolve([newFiletasks, archivedFiles, lockedEntities, maxQueueingDelay]); }).catch(function (error) { - context.log.error(`Error in getting new tasks, Error: ${JSON.stringify(error)}`); - reject(error); + return reject(error); }); }); } @@ -350,10 +377,11 @@ function PollAppendBlobFiles(context) { var newFiletasks = r[0]; var archivedRowEntities = r[1]; var entitiesToUpdate = r[2]; + var maxQueueingDelay = r[3]; context.bindings.tasks = context.bindings.tasks.concat(newFiletasks); context.log.verbose("new file tasks", newFiletasks); var batch_promises = [ - getLockedEntitiesExceedingThreshold(context).then(function (unlockedEntities) { + getLockedEntitiesExceedingThreshold(context, maxQueueingDelay).then(function (unlockedEntities) { // setting lock for new tasks and unsetting lock for old tasks entitiesToUpdate = entitiesToUpdate.concat(unlockedEntities); return batchUpdateOffsetTable(context, entitiesToUpdate, "insert"); diff --git a/AppendBlobReader/src/appendblobreaderdeploy.json b/AppendBlobReader/src/appendblobreaderdeploy.json index 626c9c2b..b800726a 100644 --- a/AppendBlobReader/src/appendblobreaderdeploy.json +++ b/AppendBlobReader/src/appendblobreaderdeploy.json @@ -537,10 +537,6 @@ "name": "AzureEventHubConnectionString", "value": "[concat(listkeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', parameters('namespaces_BlobReaderNamespace_name'),parameters('AuthorizationRules_RootManageSharedAccessKey_EventHub_name')), '2022-10-01-preview').primaryConnectionString,';EntityPath=',parameters('eventhubs_blobreadereventhub_name'))]" }, - { - "name": "TaskQueueConnectionString", - "value": "[listkeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', parameters('namespaces_blobreadertaskqueue_name'),parameters('AuthorizationRules_RootManageSharedAccessKey_TaskQueue_name')), '2022-10-01-preview').primaryConnectionString]" - }, { "name": "WEBSITE_NODE_DEFAULT_VERSION", "value": "~18" @@ -870,13 +866,13 @@ ] }, { - "type": "microsoft.operationalinsights/workspaces", + "type": "Microsoft.OperationalInsights/workspaces", "apiVersion": "2022-10-01", "name": "[parameters('logAnalyticsWorkspaceName')]", "location": "[parameters('location')]", "properties": { "sku": { - "name": "pergb2018" + "name": "PerGB2018" }, "retentionInDays": 30, "features": { @@ -898,8 +894,11 @@ "properties": { "Application_Type": "web", "applicationId": "[parameters('appInsightsName')]", - "WorkspaceResourceId": "[resourceId('microsoft.operationalinsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" - } + "WorkspaceResourceId": "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + }, + "dependsOn": [ + "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + ] } ] -} \ No newline at end of file +} diff --git a/AppendBlobReader/src/producer.js b/AppendBlobReader/src/producer.js index 24506d0a..97997865 100644 --- a/AppendBlobReader/src/producer.js +++ b/AppendBlobReader/src/producer.js @@ -99,31 +99,6 @@ function getContentLengthPerBlob(eventHubMessages, allcontentlengths, metadatama }); } -/** - * @param {} PartitionKey - * @param {} RowKey - * @param {} context - * - * retrieves the offset for a row from the table - */ -async function getBlobPointerMap(partitionKey, rowKey, context) { - // Todo Add retries for node migration in cases of timeouts(non 400 & 500 errors) - var statusCode = 200; - try { - var entity = await tableClient.getEntity(partitionKey, rowKey); - //context.log("retreived existing rowkey: " + rowKey) - } catch (err) { - // err object keys : [ 'name', 'code', 'statusCode', 'request', 'response', 'details' ] - if (err.statusCode === 404) { - //context.log("no existing row found, new file scenario for rowkey: " + rowKey) - statusCode = 404; - } else { - throw err; - } - } - //context.log({statusCode: statusCode, entity: entity}); - return { statusCode: statusCode, entity: entity }; -} /** * Update Blob Pointer Map. @@ -131,7 +106,7 @@ async function getBlobPointerMap(partitionKey, rowKey, context) { * @param {Object} entity - The entity object to update or create. * @returns {Promise} - A promise that resolves to the response from updating or creating the entity. */ -async function updateBlobPointerMap(entity) { +async function updateOrCreateBlobPointerMap(entity) { let response; if (entity.options) { let options = entity.options; @@ -149,7 +124,6 @@ async function updateBlobPointerMap(entity) { /** * @param {} PartitionKey * @param {} RowKey - * @param {} sortedcontentlengths * @param {} context * @param {} metadata * @param {} finalcontext @@ -157,54 +131,37 @@ async function updateBlobPointerMap(entity) { * If the contentLength is 0 then it is assumed to be Append Blobs else it is Block Blob * In case of Append blob a row entry is created with negative offset and all the rows with Append blob blob types are polled by append blob producer function * - * In cases of Block blob a task is created in service bus and consumer function consumes the task by download the file using the offset in task metadata and then sending to sumo logic's http endpoint */ -async function createTasksForBlob(partitionKey, rowKey, sortedcontentlengths, context, metadata) { - //context.log("inside createTasksForBlob", partitionKey, rowKey, sortedcontentlengths, metadata); - if (sortedcontentlengths.length === 0) { - return Promise.resolve({ status: "success", message: "No tasks created for rowKey: " + rowKey }); - } +async function createTasksForAppendBlob(partitionKey, rowKey, context, metadata) { + //context.log("inside createTasksForAppendBlob", partitionKey, rowKey, sortedcontentlengths, metadata); + + context.log.verbose("Creating an entry for RowKey: ", rowKey) try { - var retrievedResponse = await getBlobPointerMap(partitionKey, rowKey, context); - //context.log("retrieved blob pointer successsfully for rowkey: " + rowKey + " response: "+ retrievedResponse) + var entity = getEntity(metadata, 0, null); + var response = await updateOrCreateBlobPointerMap(entity); // this will always create + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); } catch (err) { - // unable to retrieve offset, hence ingesting whole file from starting byte - let lastoffset = sortedcontentlengths[sortedcontentlengths.length - 1] - 1; - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Retrieve offset for rowKey: " + rowKey + " Error: " + err, lastoffset: lastoffset, currentoffset: -1 }); - } - var currentoffset = retrievedResponse.statusCode === 404 ? -1 : Number(retrievedResponse.entity.offset); - var currentEtag = retrievedResponse.statusCode === 404 ? null : retrievedResponse.entity.etag; - var [tasks, lastoffset] = getNewTask(currentoffset, sortedcontentlengths, metadata); - - if (tasks.length > 0) { // modify offset only when it's been changed - var entity = getEntity(metadata, lastoffset, currentEtag); - try { - var updatedResponse = await updateBlobPointerMap(entity); - //context.log("updated blob pointer successsfully for rowkey: " + rowKey + " response: "+ updatedResponse) - context.bindings.tasks = context.bindings.tasks.concat(tasks); - return Promise.resolve({ status: "success", rowKey: rowKey, message: tasks.length + " Tasks added for rowKey: " + rowKey }); - } catch (err) { - if (err && err.details && err.details.odataError && err.details.odataError.code === "UpdateConditionNotSatisfied" && err.statusCode === 412) { - context.log.verbose("Need to Retry: " + rowKey); - } - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Update offset for rowKey: " + rowKey + " Error: " + err, lastoffset: lastoffset, currentoffset: currentoffset }); - } - } else if (currentoffset === -1 && lastoffset === -1) { - context.log("Append blob scenario create just an entry RowKey: ", rowKey) - try { - var entity = getEntity(metadata, 0, currentEtag); - var updatedResponse = await updateBlobPointerMap(entity); - context.bindings.tasks = context.bindings.tasks.concat(tasks); - return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); - } catch (err) { - if (err.code === "UpdateConditionNotSatisfied") { - context.log("Need to Retry: " + rowKey, entity); + + if (err.statusCode === 409 && JSON.stringify(err).includes("EntityAlreadyExists")) { + context.log("AppendBlob Entry exists for RowKey: " + rowKey); + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry exists for RowKey: " + rowKey }); + } else if ((err.statusCode === 404) && JSON.stringify(err).includes("TableNotFound")) { + try { + context.log(`Creating table in storage account: ${process.env.TABLE_NAME}`); + await tableClient.createTable(); + await new Promise(resolve => setTimeout(resolve, 10000)); // 10 second wait for the table to be created + var response = await updateOrCreateBlobPointerMap(entity); // this will always create + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); + } catch(err) { + return Promise.reject({ status: "failed", rowKey: rowKey, message: `Failed to create table for rowKey: ${rowKey} error: ${JSON.stringify(err)}`}); } - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Update offset for rowKey: " + rowKey }); + + } else { + return Promise.reject({ status: "failed", rowKey: rowKey, message: `Failed to create AppendBlob Entry for rowKey: ${rowKey} error: ${JSON.stringify(err)}` }); } - } else { - return Promise.resolve({ status: "success", rowKey: rowKey, message: "No tasks created for rowKey: " + rowKey }); + } + } @@ -231,7 +188,7 @@ function getNewTask(currentoffset, sortedcontentlengths, metadata) { /** * Filter messages for AppendBlob. - * + * * @param {Array} messages - An array of message objects to filter. * @returns {Array} - An array containing only the messages with AppendBlob type. */ @@ -245,7 +202,7 @@ function filterAppendBlob(messages) { /** * Filter messages by file extension. - * + * * @param {Object} context - The context object for logging or other operations. * @param {Array} messages - An array of message objects to filter. * @returns {Array} - An array containing only the messages with supported file extensions. @@ -275,8 +232,8 @@ module.exports = async function (context, eventHubMessages) { // eventHubMessages = [ // [ // { - // topic: '/subscriptions/c088dc46-d692-42ad-a4b6-9a542d28ad2a/resourceGroups/SumoAuditCollection/providers/Microsoft.Storage/storageAccounts/allbloblogseastus', - // subject: '/blobServices/default/containers/testabb/blobs/dummy_data.csv', + // topic: '/subscriptions/c088dc46-d692-42ad-a4b6-9a542d28ad2a/resourceGroups/testsumosarg290524101050/providers/Microsoft.Storage/storageAccounts/testsa290524101050', + // subject: '/blobServices/default/containers/testcontainer-29-05-24-10-10-50/blobs/test.blob', // eventType: 'Microsoft.Storage.BlobCreated', // id: '05bad26c-b01e-0064-59cf-6a5397068de0', // data: { @@ -286,7 +243,7 @@ module.exports = async function (context, eventHubMessages) { // contentType: 'text/csv', // contentLength: 0, // blobType: 'AppendBlob', - // url: 'https://allbloblogseastus.blob.core.windows.net/testabb/dummy_data.csv', + // url: 'https://testsa290524101050.blob.core.windows.net/testcontainer-29-05-24-10-10-50/test.blob', // sequencer: '00000000000000000000000000010F8A0000000000047e92', // storageDiagnostics: { batchId: 'd7656e84-7006-0036-00cf-6a2f7f000000' } // }, @@ -311,39 +268,39 @@ module.exports = async function (context, eventHubMessages) { var metadatamap = {}; var allcontentlengths = {}; getContentLengthPerBlob(filterMessages, allcontentlengths, metadatamap); - var processed = 0; - context.bindings.tasks = []; + var totalRowsCreated = 0, totalExistingRows = 0; var allRowPromises = []; var totalRows = Object.keys(allcontentlengths).length; var errArr = [], rowKey; for (rowKey in allcontentlengths) { - var sortedcontentlengths = allcontentlengths[rowKey].sort(); // ensuring increasing order of contentlengths var metadata = metadatamap[rowKey]; var partitionKey = metadata.containerName; - allRowPromises.push(sumoutils.p_retryMax(createTasksForBlob, MaxAttempts, RetryInterval, [partitionKey, rowKey, sortedcontentlengths, context, metadata], context).catch((err) => err)); + allRowPromises.push(sumoutils.p_retryMax(createTasksForAppendBlob, MaxAttempts, RetryInterval, [partitionKey, rowKey, context, metadata], context).catch((err) => err)); } + // Fail the function if any one of the files fail to get inserted into FileOffsetMap + await Promise.all(allRowPromises).then((responseValues) => { - //creating duplicate task for file causing an error when update condition is not satisfied in mutiple read and write scenarios for same row key in fileOffSetMap table + for (let response of responseValues) { - processed += 1; if (response.status === "failed") { - context.log.verbose("creating duplicate task since retry failed for rowkey: " + response.rowKey); - var duplicateTask = Object.assign({ - startByte: response.currentoffset + 1, - endByte: response.lastoffset - }, metadatamap[response.rowKey]); - context.bindings.tasks = context.bindings.tasks.concat([duplicateTask]); errArr.push(response.message); + } else if(response.status === "success" && response.message.includes("Entry exists")) { + totalExistingRows += 1; + } else { + totalRowsCreated += 1; } } }); - if (totalRows === processed) { - context.log("Tasks Created: " + JSON.stringify(context.bindings.tasks) + " Blobpaths: " + JSON.stringify(allcontentlengths)); - if (errArr.length > 0) { - context.log.error(errArr.join('\n')); - } + + var msg = `FileOffSetMap Rows Created: ${totalRowsCreated} Existing Rows: ${totalExistingRows} Failed: ${errArr.length}`; + context.log(msg); + if (errArr.length > 0) { + context.log.error(`Failed for payload: allcontentlengths: ${JSON.stringify(allcontentlengths)} ErrorResponse: ${errArr.join('\n')}`); + context.done(msg) + } else { context.done(); } + } else { context.log(`eventHubMessages might not pertain to appendblob or files with supported extensions, Exit now!`); @@ -353,4 +310,4 @@ module.exports = async function (context, eventHubMessages) { context.log.error(error) context.done(error); } -}; \ No newline at end of file +}; diff --git a/AppendBlobReader/target/appendblob_producer_build/AppendBlobTaskProducer/index.js b/AppendBlobReader/target/appendblob_producer_build/AppendBlobTaskProducer/index.js index eae7affa..5554df18 100644 --- a/AppendBlobReader/target/appendblob_producer_build/AppendBlobTaskProducer/index.js +++ b/AppendBlobReader/target/appendblob_producer_build/AppendBlobTaskProducer/index.js @@ -134,10 +134,10 @@ function queryFiles(tableQuery, context) { allentities.push(entity); } - resolve(allentities); + return resolve(allentities); } catch (error) { context.log.error(`Error while fetching queryFiles: ${JSON.stringify(error)}`); - reject(error); + return reject(error); } }) } @@ -199,16 +199,20 @@ function batchUpdateOffsetTable(context, allentities, mode) { * and thus there is a chance that the file may get locked for a long time so the below function automatically * releases the lock after a threshold is breached. */ -function getLockedEntitiesExceedingThreshold(context) { +function getLockedEntitiesExceedingThreshold(context, maxQueueingDelay) { - var maxlockThresholdMin = 15; + var maxlockThresholdMin = 30; + if (maxQueueingDelay > 30) { + context.log("WARNING maxQueueingDelay exceeding 30 minutes"); + maxlockThresholdMin = maxQueueingDelay; + } var dateVal = new Date(); dateVal.setMinutes(Math.max(0, dateVal.getMinutes() - maxlockThresholdMin)); var lockedFileQuery = `done eq ${true} and blobType eq '${'AppendBlob'}' and offset ge ${0} and lastEnqueLockTime le datetime'${dateVal.toISOString()}'` return queryFiles(lockedFileQuery, context).then(function (allentities) { - context.log("AppendBlob Locked Files exceeding maxlockThresholdMin: " + allentities.length); + context.log(`AppendBlob Locked Files exceeding maxlockThresholdMin of ${maxlockThresholdMin}: ${allentities.length}`); var unlockedEntities = allentities.map(function (entity) { - context.log("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime); + context.log.verbose("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime); return getunLockedEntity(entity); }); return unlockedEntities; @@ -278,16 +282,35 @@ function setBatchSizePerStorageAccount(newFiletasks) { filesPerStorageAccountCount[task.storageName] += 1; } }; - let MAX_READ_API_LIMIT_PER_SEC = 15000; - let MAX_GET_BLOB_REQUEST_PER_INVOKE = 25; + let MAX_READ_API_LIMIT_PER_SEC = 20000; // storage account read api limit + let MAX_GET_BLOB_REQUEST_PER_INVOKE = 50; // 50*4MB = 200MB max batch size size for (let idx = 0; idx < newFiletasks.length; idx += 1) { task = newFiletasks[idx]; - let apiCallPerFile = Math.max(1, Math.floor(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName])); - task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 12 * 1024 * 1024; + let apiCallPerFile = Math.max(1, Math.ceil(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName])); + task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 4 * 1024 * 1024; // single request fetches 4MB } return newFiletasks; } +/** + * + * + */ +function getDateDifferenceInMinutes(date_a, date_b) { + + try { + if (!(date_a && date_b)) { + return null; + } + var dateVal_a = new Date(date_a); + var dateVal_b = new Date(date_b); + var diffMs = (dateVal_b - dateVal_a); + var diffMins = Math.round(((diffMs % 86400000) % 3600000) / 60000); + return diffMins; + } catch { + return null; + } +} /** *First it fetches the unlocked append blob files rows and creates tasks for them in Event Hub * @@ -303,11 +326,13 @@ function getTasksForUnlockedFiles(context) { // fetching unlocked files which were not enqueued in last 5 minutes var existingFileQuery = `done eq ${false} and blobType eq '${'AppendBlob'}' and offset ge ${0} and ( not (lastEnqueLockTime lt '') or lastEnqueLockTime le datetime'${dateVal.toISOString()}')` return new Promise(function (resolve, reject) { - queryFiles(existingFileQuery, context).then(function (allentities) { + return queryFiles(existingFileQuery, context).then(function (allentities) { var newFiletasks = []; var archivedFiles = []; var newFileEntities = []; var lockedEntities = []; + var maxQueueingDelay = 0; + let currentQueueingDelay = null; allentities.forEach(function (entity) { if (isAppendBlobArchived(context, entity)) { archivedFiles.push(getunLockedEntity(entity)); @@ -320,6 +345,9 @@ function getTasksForUnlockedFiles(context) { } context.log.verbose("Creating task for file: " + entity.rowKey); } + + maxQueueingDelay = Math.max(maxQueueingDelay, getDateDifferenceInMinutes(entity.lastEnqueLockTime, entity.updatedate)); + }); newFileEntities = getFixedNumberOfEntitiesbyEnqueTime(context, newFileEntities) newFileEntities.forEach(function (entity) { @@ -328,10 +356,9 @@ function getTasksForUnlockedFiles(context) { }); newFiletasks = setBatchSizePerStorageAccount(newFiletasks) context.log("New File Tasks created: " + newFiletasks.length + " AppendBlob Archived Files: " + archivedFiles.length); - resolve([newFiletasks, archivedFiles, lockedEntities]); + return resolve([newFiletasks, archivedFiles, lockedEntities, maxQueueingDelay]); }).catch(function (error) { - context.log.error(`Error in getting new tasks, Error: ${JSON.stringify(error)}`); - reject(error); + return reject(error); }); }); } @@ -350,10 +377,11 @@ function PollAppendBlobFiles(context) { var newFiletasks = r[0]; var archivedRowEntities = r[1]; var entitiesToUpdate = r[2]; + var maxQueueingDelay = r[3]; context.bindings.tasks = context.bindings.tasks.concat(newFiletasks); context.log.verbose("new file tasks", newFiletasks); var batch_promises = [ - getLockedEntitiesExceedingThreshold(context).then(function (unlockedEntities) { + getLockedEntitiesExceedingThreshold(context, maxQueueingDelay).then(function (unlockedEntities) { // setting lock for new tasks and unsetting lock for old tasks entitiesToUpdate = entitiesToUpdate.concat(unlockedEntities); return batchUpdateOffsetTable(context, entitiesToUpdate, "insert"); diff --git a/AppendBlobReader/target/producer_build/AppendBlobFileTracker/function.json b/AppendBlobReader/target/producer_build/AppendBlobFileTracker/function.json index 59d3e05c..5372890d 100644 --- a/AppendBlobReader/target/producer_build/AppendBlobFileTracker/function.json +++ b/AppendBlobReader/target/producer_build/AppendBlobFileTracker/function.json @@ -8,15 +8,7 @@ "connection": "AzureEventHubConnectionString", "cardinality": "many", "consumerGroup": "$Default" - }, - { - "type": "serviceBus", - "connection": "TaskQueueConnectionString", - "name": "tasks", - "queueName": "blobrangetaskqueue", - "accessRights": "Manage", - "direction": "out" } ], "disabled": false -} \ No newline at end of file +} diff --git a/AppendBlobReader/target/producer_build/AppendBlobFileTracker/index.js b/AppendBlobReader/target/producer_build/AppendBlobFileTracker/index.js index 24506d0a..97997865 100644 --- a/AppendBlobReader/target/producer_build/AppendBlobFileTracker/index.js +++ b/AppendBlobReader/target/producer_build/AppendBlobFileTracker/index.js @@ -99,31 +99,6 @@ function getContentLengthPerBlob(eventHubMessages, allcontentlengths, metadatama }); } -/** - * @param {} PartitionKey - * @param {} RowKey - * @param {} context - * - * retrieves the offset for a row from the table - */ -async function getBlobPointerMap(partitionKey, rowKey, context) { - // Todo Add retries for node migration in cases of timeouts(non 400 & 500 errors) - var statusCode = 200; - try { - var entity = await tableClient.getEntity(partitionKey, rowKey); - //context.log("retreived existing rowkey: " + rowKey) - } catch (err) { - // err object keys : [ 'name', 'code', 'statusCode', 'request', 'response', 'details' ] - if (err.statusCode === 404) { - //context.log("no existing row found, new file scenario for rowkey: " + rowKey) - statusCode = 404; - } else { - throw err; - } - } - //context.log({statusCode: statusCode, entity: entity}); - return { statusCode: statusCode, entity: entity }; -} /** * Update Blob Pointer Map. @@ -131,7 +106,7 @@ async function getBlobPointerMap(partitionKey, rowKey, context) { * @param {Object} entity - The entity object to update or create. * @returns {Promise} - A promise that resolves to the response from updating or creating the entity. */ -async function updateBlobPointerMap(entity) { +async function updateOrCreateBlobPointerMap(entity) { let response; if (entity.options) { let options = entity.options; @@ -149,7 +124,6 @@ async function updateBlobPointerMap(entity) { /** * @param {} PartitionKey * @param {} RowKey - * @param {} sortedcontentlengths * @param {} context * @param {} metadata * @param {} finalcontext @@ -157,54 +131,37 @@ async function updateBlobPointerMap(entity) { * If the contentLength is 0 then it is assumed to be Append Blobs else it is Block Blob * In case of Append blob a row entry is created with negative offset and all the rows with Append blob blob types are polled by append blob producer function * - * In cases of Block blob a task is created in service bus and consumer function consumes the task by download the file using the offset in task metadata and then sending to sumo logic's http endpoint */ -async function createTasksForBlob(partitionKey, rowKey, sortedcontentlengths, context, metadata) { - //context.log("inside createTasksForBlob", partitionKey, rowKey, sortedcontentlengths, metadata); - if (sortedcontentlengths.length === 0) { - return Promise.resolve({ status: "success", message: "No tasks created for rowKey: " + rowKey }); - } +async function createTasksForAppendBlob(partitionKey, rowKey, context, metadata) { + //context.log("inside createTasksForAppendBlob", partitionKey, rowKey, sortedcontentlengths, metadata); + + context.log.verbose("Creating an entry for RowKey: ", rowKey) try { - var retrievedResponse = await getBlobPointerMap(partitionKey, rowKey, context); - //context.log("retrieved blob pointer successsfully for rowkey: " + rowKey + " response: "+ retrievedResponse) + var entity = getEntity(metadata, 0, null); + var response = await updateOrCreateBlobPointerMap(entity); // this will always create + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); } catch (err) { - // unable to retrieve offset, hence ingesting whole file from starting byte - let lastoffset = sortedcontentlengths[sortedcontentlengths.length - 1] - 1; - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Retrieve offset for rowKey: " + rowKey + " Error: " + err, lastoffset: lastoffset, currentoffset: -1 }); - } - var currentoffset = retrievedResponse.statusCode === 404 ? -1 : Number(retrievedResponse.entity.offset); - var currentEtag = retrievedResponse.statusCode === 404 ? null : retrievedResponse.entity.etag; - var [tasks, lastoffset] = getNewTask(currentoffset, sortedcontentlengths, metadata); - - if (tasks.length > 0) { // modify offset only when it's been changed - var entity = getEntity(metadata, lastoffset, currentEtag); - try { - var updatedResponse = await updateBlobPointerMap(entity); - //context.log("updated blob pointer successsfully for rowkey: " + rowKey + " response: "+ updatedResponse) - context.bindings.tasks = context.bindings.tasks.concat(tasks); - return Promise.resolve({ status: "success", rowKey: rowKey, message: tasks.length + " Tasks added for rowKey: " + rowKey }); - } catch (err) { - if (err && err.details && err.details.odataError && err.details.odataError.code === "UpdateConditionNotSatisfied" && err.statusCode === 412) { - context.log.verbose("Need to Retry: " + rowKey); - } - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Update offset for rowKey: " + rowKey + " Error: " + err, lastoffset: lastoffset, currentoffset: currentoffset }); - } - } else if (currentoffset === -1 && lastoffset === -1) { - context.log("Append blob scenario create just an entry RowKey: ", rowKey) - try { - var entity = getEntity(metadata, 0, currentEtag); - var updatedResponse = await updateBlobPointerMap(entity); - context.bindings.tasks = context.bindings.tasks.concat(tasks); - return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); - } catch (err) { - if (err.code === "UpdateConditionNotSatisfied") { - context.log("Need to Retry: " + rowKey, entity); + + if (err.statusCode === 409 && JSON.stringify(err).includes("EntityAlreadyExists")) { + context.log("AppendBlob Entry exists for RowKey: " + rowKey); + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry exists for RowKey: " + rowKey }); + } else if ((err.statusCode === 404) && JSON.stringify(err).includes("TableNotFound")) { + try { + context.log(`Creating table in storage account: ${process.env.TABLE_NAME}`); + await tableClient.createTable(); + await new Promise(resolve => setTimeout(resolve, 10000)); // 10 second wait for the table to be created + var response = await updateOrCreateBlobPointerMap(entity); // this will always create + return Promise.resolve({ status: "success", rowKey: rowKey, message: "AppendBlob Entry added for RowKey: " + rowKey }); + } catch(err) { + return Promise.reject({ status: "failed", rowKey: rowKey, message: `Failed to create table for rowKey: ${rowKey} error: ${JSON.stringify(err)}`}); } - return Promise.reject({ status: "failed", rowKey: rowKey, message: "Unable to Update offset for rowKey: " + rowKey }); + + } else { + return Promise.reject({ status: "failed", rowKey: rowKey, message: `Failed to create AppendBlob Entry for rowKey: ${rowKey} error: ${JSON.stringify(err)}` }); } - } else { - return Promise.resolve({ status: "success", rowKey: rowKey, message: "No tasks created for rowKey: " + rowKey }); + } + } @@ -231,7 +188,7 @@ function getNewTask(currentoffset, sortedcontentlengths, metadata) { /** * Filter messages for AppendBlob. - * + * * @param {Array} messages - An array of message objects to filter. * @returns {Array} - An array containing only the messages with AppendBlob type. */ @@ -245,7 +202,7 @@ function filterAppendBlob(messages) { /** * Filter messages by file extension. - * + * * @param {Object} context - The context object for logging or other operations. * @param {Array} messages - An array of message objects to filter. * @returns {Array} - An array containing only the messages with supported file extensions. @@ -275,8 +232,8 @@ module.exports = async function (context, eventHubMessages) { // eventHubMessages = [ // [ // { - // topic: '/subscriptions/c088dc46-d692-42ad-a4b6-9a542d28ad2a/resourceGroups/SumoAuditCollection/providers/Microsoft.Storage/storageAccounts/allbloblogseastus', - // subject: '/blobServices/default/containers/testabb/blobs/dummy_data.csv', + // topic: '/subscriptions/c088dc46-d692-42ad-a4b6-9a542d28ad2a/resourceGroups/testsumosarg290524101050/providers/Microsoft.Storage/storageAccounts/testsa290524101050', + // subject: '/blobServices/default/containers/testcontainer-29-05-24-10-10-50/blobs/test.blob', // eventType: 'Microsoft.Storage.BlobCreated', // id: '05bad26c-b01e-0064-59cf-6a5397068de0', // data: { @@ -286,7 +243,7 @@ module.exports = async function (context, eventHubMessages) { // contentType: 'text/csv', // contentLength: 0, // blobType: 'AppendBlob', - // url: 'https://allbloblogseastus.blob.core.windows.net/testabb/dummy_data.csv', + // url: 'https://testsa290524101050.blob.core.windows.net/testcontainer-29-05-24-10-10-50/test.blob', // sequencer: '00000000000000000000000000010F8A0000000000047e92', // storageDiagnostics: { batchId: 'd7656e84-7006-0036-00cf-6a2f7f000000' } // }, @@ -311,39 +268,39 @@ module.exports = async function (context, eventHubMessages) { var metadatamap = {}; var allcontentlengths = {}; getContentLengthPerBlob(filterMessages, allcontentlengths, metadatamap); - var processed = 0; - context.bindings.tasks = []; + var totalRowsCreated = 0, totalExistingRows = 0; var allRowPromises = []; var totalRows = Object.keys(allcontentlengths).length; var errArr = [], rowKey; for (rowKey in allcontentlengths) { - var sortedcontentlengths = allcontentlengths[rowKey].sort(); // ensuring increasing order of contentlengths var metadata = metadatamap[rowKey]; var partitionKey = metadata.containerName; - allRowPromises.push(sumoutils.p_retryMax(createTasksForBlob, MaxAttempts, RetryInterval, [partitionKey, rowKey, sortedcontentlengths, context, metadata], context).catch((err) => err)); + allRowPromises.push(sumoutils.p_retryMax(createTasksForAppendBlob, MaxAttempts, RetryInterval, [partitionKey, rowKey, context, metadata], context).catch((err) => err)); } + // Fail the function if any one of the files fail to get inserted into FileOffsetMap + await Promise.all(allRowPromises).then((responseValues) => { - //creating duplicate task for file causing an error when update condition is not satisfied in mutiple read and write scenarios for same row key in fileOffSetMap table + for (let response of responseValues) { - processed += 1; if (response.status === "failed") { - context.log.verbose("creating duplicate task since retry failed for rowkey: " + response.rowKey); - var duplicateTask = Object.assign({ - startByte: response.currentoffset + 1, - endByte: response.lastoffset - }, metadatamap[response.rowKey]); - context.bindings.tasks = context.bindings.tasks.concat([duplicateTask]); errArr.push(response.message); + } else if(response.status === "success" && response.message.includes("Entry exists")) { + totalExistingRows += 1; + } else { + totalRowsCreated += 1; } } }); - if (totalRows === processed) { - context.log("Tasks Created: " + JSON.stringify(context.bindings.tasks) + " Blobpaths: " + JSON.stringify(allcontentlengths)); - if (errArr.length > 0) { - context.log.error(errArr.join('\n')); - } + + var msg = `FileOffSetMap Rows Created: ${totalRowsCreated} Existing Rows: ${totalExistingRows} Failed: ${errArr.length}`; + context.log(msg); + if (errArr.length > 0) { + context.log.error(`Failed for payload: allcontentlengths: ${JSON.stringify(allcontentlengths)} ErrorResponse: ${errArr.join('\n')}`); + context.done(msg) + } else { context.done(); } + } else { context.log(`eventHubMessages might not pertain to appendblob or files with supported extensions, Exit now!`); @@ -353,4 +310,4 @@ module.exports = async function (context, eventHubMessages) { context.log.error(error) context.done(error); } -}; \ No newline at end of file +}; diff --git a/AppendBlobReader/tests/offset.test.js b/AppendBlobReader/tests/offset.test.js index 713b3b05..25a25b82 100644 --- a/AppendBlobReader/tests/offset.test.js +++ b/AppendBlobReader/tests/offset.test.js @@ -20,7 +20,7 @@ context.log.verbose = function (message) { }; var sendOptions = { - urlString: 'https://collectors.sumologic.com/receiver/v1/http/ZaVnC4dhaV0hr0BM3rtK7_BJJkeUzqkGmGly-2SjDIFTwjUBjaAA2Afx6q-u34DxN2jtbYfvVoabBV-diHc7fxKcU0dw-uELJyVpEEGDxtGKGd-btZLqzg==', + urlString: '', MaxAttempts: 3, RetryInterval: 3000, compress_data: true, @@ -125,4 +125,4 @@ test.concurrent('Parse log T4 to equal R4', async () => { newOffset = parseInt(serviceBusTask.startByte, 10) + curoutputData; expect(newOffset).toBe(expectedOffset); -}, 10000); \ No newline at end of file +}, 10000); diff --git a/AppendBlobReader/tests/test_appendblobreader.py b/AppendBlobReader/tests/test_appendblobreader.py index 12175b11..ea2b7ac1 100644 --- a/AppendBlobReader/tests/test_appendblobreader.py +++ b/AppendBlobReader/tests/test_appendblobreader.py @@ -47,7 +47,7 @@ def test_01_pipeline(self): self.deploy_template() self.assertTrue(self.resource_group_exists(self.resource_group_name)) self.table_service = self.get_table_service() - self.create_offset_table(self.offsetmap_table_name) + self.create_offset_table(self.offsetmap_table_name) # now this gets created automatically def test_02_resource_count(self): expected_resource_count = 12 # 10 + 2(microsoft.insights/autoscalesettings) @@ -65,7 +65,7 @@ def test_03_func_logs(self): azurefunction = "AppendBlobFileTracker" captured_output = self.fetchlogs(app_insights.name, azurefunction) - message = "Append blob scenario create just an entry RowKey:" + message = "FileOffSetMap Rows Created: 1 Existing Rows: 0 Failed: 0" self.assertTrue(self.filter_logs(captured_output, 'message', message), f"No '{message}' log line found in '{azurefunction}' function logs") expected_count = 1 diff --git a/BlockBlobReader/src/blobreaderdeploy.json b/BlockBlobReader/src/blobreaderdeploy.json index 45921810..b7195073 100644 --- a/BlockBlobReader/src/blobreaderdeploy.json +++ b/BlockBlobReader/src/blobreaderdeploy.json @@ -695,13 +695,13 @@ ] }, { - "type": "microsoft.operationalinsights/workspaces", + "type": "Microsoft.OperationalInsights/workspaces", "apiVersion": "2022-10-01", "name": "[parameters('logAnalyticsWorkspaceName')]", "location": "[parameters('location')]", "properties": { "sku": { - "name": "pergb2018" + "name": "PerGB2018" }, "retentionInDays": 30, "features": { @@ -723,8 +723,11 @@ "properties": { "Application_Type": "web", "applicationId": "[parameters('appInsightsName')]", - "WorkspaceResourceId": "[resourceId('microsoft.operationalinsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" - } + "WorkspaceResourceId": "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + }, + "dependsOn": [ + "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + ] } ] } diff --git a/BlockBlobReader/src/blobreaderdeploywithPremiumPlan.json b/BlockBlobReader/src/blobreaderdeploywithPremiumPlan.json index c1039f7c..b8228555 100644 --- a/BlockBlobReader/src/blobreaderdeploywithPremiumPlan.json +++ b/BlockBlobReader/src/blobreaderdeploywithPremiumPlan.json @@ -689,13 +689,13 @@ ] }, { - "type": "microsoft.operationalinsights/workspaces", + "type": "Microsoft.OperationalInsights/workspaces", "apiVersion": "2022-10-01", "name": "[parameters('logAnalyticsWorkspaceName')]", "location": "[parameters('location')]", "properties": { "sku": { - "name": "pergb2018" + "name": "PerGB2018" }, "retentionInDays": 30, "features": { @@ -717,8 +717,11 @@ "properties": { "Application_Type": "web", "applicationId": "[parameters('appInsightsName')]", - "WorkspaceResourceId": "[resourceId('microsoft.operationalinsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" - } + "WorkspaceResourceId": "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + }, + "dependsOn": [ + "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + ] } ] } diff --git a/EventHubs/src/azuredeploy_metrics.json b/EventHubs/src/azuredeploy_metrics.json index 5901b846..34c19af6 100644 --- a/EventHubs/src/azuredeploy_metrics.json +++ b/EventHubs/src/azuredeploy_metrics.json @@ -325,13 +325,13 @@ ] }, { - "type": "microsoft.operationalinsights/workspaces", + "type": "Microsoft.OperationalInsights/workspaces", "apiVersion": "2022-10-01", "name": "[parameters('logAnalyticsWorkspaceName')]", "location": "[parameters('location')]", "properties": { "sku": { - "name": "pergb2018" + "name": "PerGB2018" }, "retentionInDays": 30, "features": { @@ -353,8 +353,11 @@ "properties": { "Application_Type": "web", "applicationId": "[parameters('appInsightsName')]", - "WorkspaceResourceId": "[resourceId('microsoft.operationalinsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" - } + "WorkspaceResourceId": "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + }, + "dependsOn": [ + "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]" + ] } ] }