Skip to content

Commit

Permalink
Merge pull request #109 from SumoLogic/hpal_offsettable_fix
Browse files Browse the repository at this point in the history
SUMO-241562: File Tracker function should fail if Fileoffsetmap table does not exist and no data should get ingested.
  • Loading branch information
himanshu219 authored May 29, 2024
2 parents fca64ca + 4b428cc commit 765d4ee
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 248 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/syntax-validation-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: "Syntax validation tests"
on: [workflow_dispatch, pull_request]

jobs:
package-version-test:
runs-on: ubuntu-latest
strategy:
matrix:
include: # Includes one more job
- dir: 'BlockBlobReader/target/consumer_build'
- dir: 'BlockBlobReader/target/dlqprocessor_build'
- dir: 'BlockBlobReader/target/producer_build'
- dir: 'AppendBlobReader/target/producer_build'
- dir: 'AppendBlobReader/target/appendblob_producer_build'
- dir: 'AppendBlobReader/target/consumer_build'
- dir: 'AppendBlobReader/target/consumer_build'
- dir: 'AppendBlobReader/target/consumer_build'
- dir: 'EventHubs/target/metrics_build'
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup node
uses: actions/setup-node@v4
with:
node-version: 18

- working-directory: ${{ matrix.dir }}
run: |
find ./ -name *.js | xargs node -c
58 changes: 43 additions & 15 deletions AppendBlobReader/src/appendblobproducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ function queryFiles(tableQuery, context) {
allentities.push(entity);
}

resolve(allentities);
return resolve(allentities);
} catch (error) {
context.log.error(`Error while fetching queryFiles: ${JSON.stringify(error)}`);
reject(error);
return reject(error);
}
})
}
Expand Down Expand Up @@ -199,16 +199,20 @@ function batchUpdateOffsetTable(context, allentities, mode) {
* and thus there is a chance that the file may get locked for a long time so the below function automatically
* releases the lock after a threshold is breached.
*/
function getLockedEntitiesExceedingThreshold(context) {
function getLockedEntitiesExceedingThreshold(context, maxQueueingDelay) {

var maxlockThresholdMin = 15;
var maxlockThresholdMin = 30;
if (maxQueueingDelay > 30) {
context.log("WARNING maxQueueingDelay exceeding 30 minutes");
maxlockThresholdMin = maxQueueingDelay;
}
var dateVal = new Date();
dateVal.setMinutes(Math.max(0, dateVal.getMinutes() - maxlockThresholdMin));
var lockedFileQuery = `done eq ${true} and blobType eq '${'AppendBlob'}' and offset ge ${0} and lastEnqueLockTime le datetime'${dateVal.toISOString()}'`
return queryFiles(lockedFileQuery, context).then(function (allentities) {
context.log("AppendBlob Locked Files exceeding maxlockThresholdMin: " + allentities.length);
context.log(`AppendBlob Locked Files exceeding maxlockThresholdMin of ${maxlockThresholdMin}: ${allentities.length}`);
var unlockedEntities = allentities.map(function (entity) {
context.log("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime);
context.log.verbose("Unlocking Append Blob File with rowKey: %s lastEnqueLockTime: %s", entity.rowKey, entity.lastEnqueLockTime);
return getunLockedEntity(entity);
});
return unlockedEntities;
Expand Down Expand Up @@ -278,16 +282,35 @@ function setBatchSizePerStorageAccount(newFiletasks) {
filesPerStorageAccountCount[task.storageName] += 1;
}
};
let MAX_READ_API_LIMIT_PER_SEC = 15000;
let MAX_GET_BLOB_REQUEST_PER_INVOKE = 25;
let MAX_READ_API_LIMIT_PER_SEC = 20000; // storage account read api limit
let MAX_GET_BLOB_REQUEST_PER_INVOKE = 50; // 50*4MB = 200MB max batch size size
for (let idx = 0; idx < newFiletasks.length; idx += 1) {
task = newFiletasks[idx];
let apiCallPerFile = Math.max(1, Math.floor(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName]));
task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 12 * 1024 * 1024;
let apiCallPerFile = Math.max(1, Math.ceil(MAX_READ_API_LIMIT_PER_SEC / filesPerStorageAccountCount[task.storageName]));
task.batchSize = Math.min(MAX_GET_BLOB_REQUEST_PER_INVOKE, apiCallPerFile) * 4 * 1024 * 1024; // single request fetches 4MB
}
return newFiletasks;
}

/**
*
*
*/
function getDateDifferenceInMinutes(date_a, date_b) {

try {
if (!(date_a && date_b)) {
return null;
}
var dateVal_a = new Date(date_a);
var dateVal_b = new Date(date_b);
var diffMs = (dateVal_b - dateVal_a);
var diffMins = Math.round(((diffMs % 86400000) % 3600000) / 60000);
return diffMins;
} catch {
return null;
}
}
/**
*First it fetches the unlocked append blob files rows and creates tasks for them in Event Hub
*
Expand All @@ -303,11 +326,13 @@ function getTasksForUnlockedFiles(context) {
// fetching unlocked files which were not enqueued in last 5 minutes
var existingFileQuery = `done eq ${false} and blobType eq '${'AppendBlob'}' and offset ge ${0} and ( not (lastEnqueLockTime lt '') or lastEnqueLockTime le datetime'${dateVal.toISOString()}')`
return new Promise(function (resolve, reject) {
queryFiles(existingFileQuery, context).then(function (allentities) {
return queryFiles(existingFileQuery, context).then(function (allentities) {
var newFiletasks = [];
var archivedFiles = [];
var newFileEntities = [];
var lockedEntities = [];
var maxQueueingDelay = 0;
let currentQueueingDelay = null;
allentities.forEach(function (entity) {
if (isAppendBlobArchived(context, entity)) {
archivedFiles.push(getunLockedEntity(entity));
Expand All @@ -320,6 +345,9 @@ function getTasksForUnlockedFiles(context) {
}
context.log.verbose("Creating task for file: " + entity.rowKey);
}

maxQueueingDelay = Math.max(maxQueueingDelay, getDateDifferenceInMinutes(entity.lastEnqueLockTime, entity.updatedate));

});
newFileEntities = getFixedNumberOfEntitiesbyEnqueTime(context, newFileEntities)
newFileEntities.forEach(function (entity) {
Expand All @@ -328,10 +356,9 @@ function getTasksForUnlockedFiles(context) {
});
newFiletasks = setBatchSizePerStorageAccount(newFiletasks)
context.log("New File Tasks created: " + newFiletasks.length + " AppendBlob Archived Files: " + archivedFiles.length);
resolve([newFiletasks, archivedFiles, lockedEntities]);
return resolve([newFiletasks, archivedFiles, lockedEntities, maxQueueingDelay]);
}).catch(function (error) {
context.log.error(`Error in getting new tasks, Error: ${JSON.stringify(error)}`);
reject(error);
return reject(error);
});
});
}
Expand All @@ -350,10 +377,11 @@ function PollAppendBlobFiles(context) {
var newFiletasks = r[0];
var archivedRowEntities = r[1];
var entitiesToUpdate = r[2];
var maxQueueingDelay = r[3];
context.bindings.tasks = context.bindings.tasks.concat(newFiletasks);
context.log.verbose("new file tasks", newFiletasks);
var batch_promises = [
getLockedEntitiesExceedingThreshold(context).then(function (unlockedEntities) {
getLockedEntitiesExceedingThreshold(context, maxQueueingDelay).then(function (unlockedEntities) {
// setting lock for new tasks and unsetting lock for old tasks
entitiesToUpdate = entitiesToUpdate.concat(unlockedEntities);
return batchUpdateOffsetTable(context, entitiesToUpdate, "insert");
Expand Down
17 changes: 8 additions & 9 deletions AppendBlobReader/src/appendblobreaderdeploy.json
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,6 @@
"name": "AzureEventHubConnectionString",
"value": "[concat(listkeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', parameters('namespaces_BlobReaderNamespace_name'),parameters('AuthorizationRules_RootManageSharedAccessKey_EventHub_name')), '2022-10-01-preview').primaryConnectionString,';EntityPath=',parameters('eventhubs_blobreadereventhub_name'))]"
},
{
"name": "TaskQueueConnectionString",
"value": "[listkeys(resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', parameters('namespaces_blobreadertaskqueue_name'),parameters('AuthorizationRules_RootManageSharedAccessKey_TaskQueue_name')), '2022-10-01-preview').primaryConnectionString]"
},
{
"name": "WEBSITE_NODE_DEFAULT_VERSION",
"value": "~18"
Expand Down Expand Up @@ -870,13 +866,13 @@
]
},
{
"type": "microsoft.operationalinsights/workspaces",
"type": "Microsoft.OperationalInsights/workspaces",
"apiVersion": "2022-10-01",
"name": "[parameters('logAnalyticsWorkspaceName')]",
"location": "[parameters('location')]",
"properties": {
"sku": {
"name": "pergb2018"
"name": "PerGB2018"
},
"retentionInDays": 30,
"features": {
Expand All @@ -898,8 +894,11 @@
"properties": {
"Application_Type": "web",
"applicationId": "[parameters('appInsightsName')]",
"WorkspaceResourceId": "[resourceId('microsoft.operationalinsights/workspaces', parameters('logAnalyticsWorkspaceName'))]"
}
"WorkspaceResourceId": "[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]"
},
"dependsOn": [
"[resourceId('Microsoft.OperationalInsights/workspaces', parameters('logAnalyticsWorkspaceName'))]"
]
}
]
}
}
Loading

0 comments on commit 765d4ee

Please sign in to comment.