Skip to content

Commit

Permalink
Add coverage for CNodeHealthManager (#3167)
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored May 27, 2022
1 parent 8f9fe4c commit a6f0272
Show file tree
Hide file tree
Showing 3 changed files with 613 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ class StateMonitoringQueue {
config.get('redisHost'),
config.get('redisPort')
)
this.registerQueueEventHandlers({
this.registerQueueEventHandlersAndJobProcessor({
queue: this.queue,
jobSuccessCallback: this.enqueueJobAfterSuccess,
jobFailureCallback: this.enqueueJobAfterFailure
jobFailureCallback: this.enqueueJobAfterFailure,
processJob: this.processJob
})
this.registerQueueJobProcessor(this.queue)

await this.startQueue(
this.queue,
Expand Down Expand Up @@ -83,11 +83,13 @@ class StateMonitoringQueue {
* @param {Object} params.queue the queue to register events for
* @param {Function<queue, successfulJob, jobResult>} params.jobSuccessCallback the function to call when a job succeeds
* @param {Function<queue, failedJob>} params.jobFailureCallback the function to call when a job fails
* @param {Function<job>} params.processJob the function to call when processing a job from the queue
*/
registerQueueEventHandlers({
registerQueueEventHandlersAndJobProcessor({
queue,
jobSuccessCallback,
jobFailureCallback
jobFailureCallback,
processJob
}) {
// Add handlers for logging
queue.on('global:waiting', (jobId) => {
Expand Down Expand Up @@ -127,6 +129,9 @@ class StateMonitoringQueue {
)
jobFailureCallback(queue, job)
})

// Register the logic that gets executed to process each new job from the queue
queue.process(1 /** concurrency */, processJob)
}

/**
Expand Down Expand Up @@ -173,55 +178,47 @@ class StateMonitoringQueue {
})
}

/**
* Registers the logic that gets executed to process each new job from the queue.
* @param {Object} queue the StateMonitoringQueue to consume jobs from
*/
registerQueueJobProcessor(queue) {
// Initialize queue job processor (aka consumer)
queue.process(1 /** concurrency */, async (job) => {
const {
id: jobId,
data: {
lastProcessedUserId,
discoveryNodeEndpoint,
moduloBase,
currentModuloSlice
}
} = job

try {
this.log(`New job details: jobId=${jobId}, job=${JSON.stringify(job)}`)
} catch (e) {
this.logError(`Failed to log details for jobId=${jobId}: ${e}`)
async processJob(job) {
const {
id: jobId,
data: {
lastProcessedUserId,
discoveryNodeEndpoint,
moduloBase,
currentModuloSlice
}
} = job

// Default results of this job will be passed to the next job, so default to failure
let result = {
try {
this.log(`New job details: jobId=${jobId}, job=${JSON.stringify(job)}`)
} catch (e) {
this.logError(`Failed to log details for jobId=${jobId}: ${e}`)
}

// Default results of this job will be passed to the next job, so default to failure
let result = {
lastProcessedUserId,
jobFailed: true,
moduloBase,
currentModuloSlice
}
try {
// TODO: Wire up metrics
// await redis.set('stateMachineQueueLatestJobStart', Date.now())
result = await processStateMonitoringJob(
jobId,
lastProcessedUserId,
jobFailed: true,
discoveryNodeEndpoint,
moduloBase,
currentModuloSlice
}
try {
// TODO: Wire up metrics
// await redis.set('stateMachineQueueLatestJobStart', Date.now())
result = await processStateMonitoringJob(
jobId,
lastProcessedUserId,
discoveryNodeEndpoint,
moduloBase,
currentModuloSlice
)
// TODO: Wire up metrics
// await redis.set('stateMachineQueueLatestJobSuccess', Date.now())
} catch (e) {
this.logError(`Error processing jobId ${jobId}: ${e}`)
console.log(e.stack)
}
)
// TODO: Wire up metrics
// await redis.set('stateMachineQueueLatestJobSuccess', Date.now())
} catch (e) {
this.logError(`Error processing jobId ${jobId}: ${e}`)
}

return result
})
return result
}

/**
Expand Down
Loading

0 comments on commit a6f0272

Please sign in to comment.