From 8667bdbd6f14cf2c10913ca1dc09261a22844069 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Thu, 29 Aug 2024 16:23:31 -0400 Subject: [PATCH 1/2] fix(cursor): throw error in ChangeStream constructor if `changeStreamThunk()` throws a sync error --- lib/cursor/changeStream.js | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index 55cdecfcdc..508dd6e0a3 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -33,19 +33,34 @@ class ChangeStream extends EventEmitter { ); } + let syncError = null; this.$driverChangeStreamPromise = new Promise((resolve, reject) => { // This wrapper is necessary because of buffering. - changeStreamThunk((err, driverChangeStream) => { - if (err != null) { - this.emit('error', err); - return reject(err); - } + try { + changeStreamThunk((err, driverChangeStream) => { + if (err != null) { + this.emit('error', err); + return reject(err); + } - this.driverChangeStream = driverChangeStream; - this.emit('ready'); - resolve(); - }); + this.driverChangeStream = driverChangeStream; + this.emit('ready'); + resolve(); + }); + } catch (err) { + syncError = err; + this.emit('error', err); + reject(err); + } }); + + // Because a ChangeStream is an event emitter, there's no way to register an 'error' handler + // that catches errors which occur in the constructor, unless we force sync errors into async + // errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous + // errors that occurred with changeStreamThunk(). + if (syncError != null) { + throw syncError; + } } _bindEvents() { From 9fa956fa6a03b2cc2231acc887c0084286da53ea Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Thu, 29 Aug 2024 16:32:50 -0400 Subject: [PATCH 2/2] throw error if calling methods on change stream that errored out --- lib/cursor/changeStream.js | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index 508dd6e0a3..b41e2379e8 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -5,6 +5,7 @@ */ const EventEmitter = require('events').EventEmitter; +const MongooseError = require('../error/mongooseError'); /*! * ignore @@ -25,6 +26,7 @@ class ChangeStream extends EventEmitter { this.bindedEvents = false; this.pipeline = pipeline; this.options = options; + this.errored = false; if (options && options.hydrate && !options.model) { throw new Error( @@ -39,6 +41,7 @@ class ChangeStream extends EventEmitter { try { changeStreamThunk((err, driverChangeStream) => { if (err != null) { + this.errored = true; this.emit('error', err); return reject(err); } @@ -49,6 +52,7 @@ class ChangeStream extends EventEmitter { }); } catch (err) { syncError = err; + this.errored = true; this.emit('error', err); reject(err); } @@ -107,10 +111,16 @@ class ChangeStream extends EventEmitter { } hasNext(cb) { + if (this.errored) { + throw new MongooseError('Cannot call hasNext() on errored ChangeStream'); + } return this.driverChangeStream.hasNext(cb); } next(cb) { + if (this.errored) { + throw new MongooseError('Cannot call next() on errored ChangeStream'); + } if (this.options && this.options.hydrate) { if (cb != null) { const originalCb = cb; @@ -141,16 +151,25 @@ class ChangeStream extends EventEmitter { } addListener(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call addListener() on errored ChangeStream'); + } this._bindEvents(); return super.addListener(event, handler); } on(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call on() on errored ChangeStream'); + } this._bindEvents(); return super.on(event, handler); } once(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call once() on errored ChangeStream'); + } this._bindEvents(); return super.once(event, handler); }