diff --git a/package.json b/package.json index 43b57b8381..60346d1d53 100644 --- a/package.json +++ b/package.json @@ -48,7 +48,7 @@ "cron-parser": "^4.2.1", "get-port": "^5.1.1", "glob": "^7.2.0", - "ioredis": "^4.28.2", + "ioredis": "^4.28.5", "lodash": "^4.17.21", "msgpackr": "^1.4.6", "semver": "^6.3.0", diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 4a9e3958ae..68e4804d46 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -58,17 +58,25 @@ export class RedisConnection extends EventEmitter { if (this.blocking) { this.opts.maxRetriesPerRequest = null; } + + this.checkUpstashHost(this.opts.host); } else { this._client = opts; - this.opts = isRedisCluster(this._client) - ? this._client.options.redisOptions - : this._client.options; + + if (isRedisCluster(this._client)) { + const hosts = (this._client).startupNodes.map( + (node: { host: string }) => node.host, + ); + this.checkUpstashHost(this._client.options.redisOptions?.host || hosts); + } else { + this.opts = this._client.options; + + this.checkUpstashHost(this.opts.host); + } this.checkBlockingOptions(deprecationMessage, this.opts); } - this.checkUpstashHost(this.opts.host); - this.handleClientError = (err: Error): void => { this.emit('error', err); }; @@ -83,8 +91,11 @@ export class RedisConnection extends EventEmitter { } } - private checkUpstashHost(host: string | undefined) { - if (host?.endsWith('upstash.io')) { + private checkUpstashHost(host: string[] | string | undefined) { + const includesUpstash = Array.isArray(host) + ? host.some(node => node.endsWith('upstash.io')) + : host?.endsWith('upstash.io'); + if (includesUpstash) { throw new Error(upstashMessage); } } diff --git a/tests/test_connection.ts b/tests/test_connection.ts index 2e3653885e..a449784164 100644 --- a/tests/test_connection.ts +++ b/tests/test_connection.ts @@ -62,6 +62,43 @@ describe('connection', () => { 'BullMQ: Upstash is not compatible with BullMQ.', ); }); + + describe('when using Cluster instance', async () => { + it('throws an error', async () => { + const connection = new IORedis.Cluster([ + { + host: 'https://upstash.io', + }, + ]); + + expect(() => new QueueBase(queueName, { connection })).to.throw( + 'BullMQ: Upstash is not compatible with BullMQ.', + ); + await connection.disconnect(); + }); + }); + + describe('when using redisOptions', async () => { + it('throws an error', async () => { + const connection = new IORedis.Cluster( + [ + { + host: 'localhost', + }, + ], + { + redisOptions: { + host: 'https://upstash.io', + }, + }, + ); + + expect(() => new QueueBase(queueName, { connection })).to.throw( + 'BullMQ: Upstash is not compatible with BullMQ.', + ); + await connection.disconnect(); + }); + }); }); it('should recover from a connection loss', async () => { diff --git a/tests/test_obliterate.ts b/tests/test_obliterate.ts index e70195b0cb..de7e2d18b3 100644 --- a/tests/test_obliterate.ts +++ b/tests/test_obliterate.ts @@ -383,6 +383,7 @@ describe('Obliterate', function () { const worker = new Worker( queue.name, async job => { + await delay(100); return job.log('Lorem Ipsum Dolor Sit Amet'); }, { connection }, diff --git a/yarn.lock b/yarn.lock index f982c8c0c2..8f005c65c9 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3436,7 +3436,7 @@ into-stream@^6.0.0: from2 "^2.3.0" p-is-promise "^3.0.0" -ioredis@^4.28.2: +ioredis@^4.28.5: version "4.28.5" resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.28.5.tgz#5c149e6a8d76a7f8fa8a504ffc85b7d5b6797f9f" integrity sha512-3GYo0GJtLqgNXj4YhrisLaNNvWSNwSS2wS4OELGfGxH8I69+XfNdnmV1AyN+ZqMh0i7eX+SWjrwFKDBDgfBC1A==