Skip to content

Commit

Permalink
Save queue specific results and stats
Browse files Browse the repository at this point in the history
  • Loading branch information
mugli committed Oct 5, 2019
1 parent d72d95c commit 7d4fe45
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions src/consumer-unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,11 @@ export class ConsumerUnit {
this._log('Starting to process task', task);
this._totalTasks++;

// TODO: Update queue specific total processed stat
await this._redis.hincrby(defaultOptions.STAT, 'processed', 1);
await this._redis
.pipeline()
.hincrby(defaultOptions.STAT, 'processed', 1)
.hincrby(`${defaultOptions.STAT}:${this._QNAME}`, 'processed', 1)
.exec();

const metadata = { id: task.id, qname: this.qname, retryCount: task.retryCount, consumerName: this._name };
try {
Expand Down Expand Up @@ -366,6 +369,12 @@ export class ConsumerUnit {
.dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue
.lpush(defaultOptions.RESULTLIST, resultVal)
.ltrim(defaultOptions.RESULTLIST, 0, <number>defaultOptions.queueOptions.maxResultListSize - 1)
.lpush(`${defaultOptions.RESULTLIST}:${this._QNAME}`, resultVal)
.ltrim(
`${defaultOptions.RESULTLIST}:${this._QNAME}`,
0,
<number>defaultOptions.queueOptions.maxIndividualQueueResultSize - 1
)
.exec();
}

Expand All @@ -391,28 +400,40 @@ export class ConsumerUnit {
.pipeline()
.requeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dataString, task.dedupKey, task.retryCount)
.hincrby(defaultOptions.STAT, 'retries', 1)
.hincrby(`${defaultOptions.STAT}:${this._QNAME}`, 'retries', 1)
.exec();
// TODO: Update queue specific total retries stat
} else {
// Move to deadlist
await this._redis
.pipeline()
.dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue
.lpush(defaultOptions.DEADLIST, info)
.ltrim(defaultOptions.DEADLIST, 0, <number>defaultOptions.queueOptions.maxDeadListSize - 1)
.lpush(`${defaultOptions.DEADLIST}:${this._QNAME}`, info)
.ltrim(
`${defaultOptions.DEADLIST}:${this._QNAME}`,
0,
<number>defaultOptions.queueOptions.maxIndividualQueueResultSize - 1
)
.hincrby(defaultOptions.STAT, 'dead', 1)
.hincrby(`${defaultOptions.STAT}:${this._QNAME}`, 'dead', 1)
.exec();
// TODO: Update queue specific total dead stat
}

// Add to failed list in all cases
await this._redis
.pipeline()
.lpush(defaultOptions.FAILEDLIST, info)
.ltrim(defaultOptions.FAILEDLIST, 0, <number>defaultOptions.queueOptions.maxFailedListSize - 1)
.lpush(`${defaultOptions.FAILEDLIST}:${this._QNAME}`, info)
.ltrim(
`${defaultOptions.FAILEDLIST}:${this._QNAME}`,
0,
<number>defaultOptions.queueOptions.maxIndividualQueueResultSize - 1
)
.hincrby(defaultOptions.STAT, 'failed', 1)
.hincrby(`${defaultOptions.STAT}:${this._QNAME}`, 'failed', 1)
.exec();
// TODO: Update queue specific total failed stat
}

_wrapWorkerFn(data: any, metadata: Metadata) {
Expand Down

0 comments on commit 7d4fe45

Please sign in to comment.