From ce0c6c9ea2d90184f52c30276f7d1608f3a58f83 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 22 Dec 2023 11:32:05 +0100 Subject: [PATCH] fix(worker): fix so that worker can be closed if Redis is down --- src/classes/redis-connection.ts | 17 ++++++++++++++--- src/classes/worker.ts | 7 +++++-- tests/test_connection.ts | 10 ++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index c599ab9423..2111e8e514 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -41,6 +41,8 @@ export class RedisConnection extends EventEmitter { canDoubleTimeout: false, }; + status: 'initializing' | 'ready' | 'closing' | 'closed' = 'initializing'; + protected _client: RedisClient; private readonly opts: RedisOptions; @@ -210,6 +212,7 @@ export class RedisConnection extends EventEmitter { this._client.on('ready', this.handleClientReady); await RedisConnection.waitUntilReady(this._client); + this.loadCommands(); this.version = await this.getRedisVersion(); @@ -239,6 +242,8 @@ export class RedisConnection extends EventEmitter { canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'), }; + this.status = 'ready'; + return this._client; } @@ -280,11 +285,16 @@ export class RedisConnection extends EventEmitter { async close(): Promise { if (!this.closing) { + const status = this.status; + this.status = 'closing'; this.closing = true; try { - await this.initializing; - if (!this.shared) { - await this._client.quit(); + if (status === 'ready') { + // Not sure if we need to wait for this + await this.initializing; + if (!this.shared) { + await this._client.quit(); + } } } catch (error) { if (isNotConnectionError(error as Error)) { @@ -298,6 +308,7 @@ export class RedisConnection extends EventEmitter { decreaseMaxListeners(this._client, 3); this.removeAllListeners(); + this.status = 'closed'; } } } diff --git a/src/classes/worker.ts b/src/classes/worker.ts index a612583c2d..519f643bf9 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -810,7 +810,10 @@ export class Worker< this.abortDelayController?.abort(); - const client = await this.blockingConnection.client; + const client = + this.blockingConnection.status == 'ready' + ? await this.blockingConnection.client + : null; this.resume(); await Promise.resolve() @@ -832,7 +835,7 @@ export class Worker< }) .finally(() => clearTimeout(this.extendLocksTimer)) .finally(() => clearTimeout(this.stalledCheckTimer)) - .finally(() => client.disconnect()) + .finally(() => client && client.disconnect()) .finally(() => this.connection.close()) .finally(() => this.emit('closed')); this.closed = true; diff --git a/tests/test_connection.ts b/tests/test_connection.ts index a4a41f5967..79b18ee4f5 100644 --- a/tests/test_connection.ts +++ b/tests/test_connection.ts @@ -134,6 +134,16 @@ describe('connection', () => { }); }); + it('should close worker even if redis is down', async () => { + const connection = new IORedis('badhost', { maxRetriesPerRequest: null }); + connection.on('error', () => {}); + + const worker = new Worker('test', async () => {}, { connection, prefix }); + + worker.on('error', err => {}); + await worker.close(); + }); + it('should recover from a connection loss', async () => { let processor;