diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index 55cdecfcdc..b41e2379e8 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -5,6 +5,7 @@ */ const EventEmitter = require('events').EventEmitter; +const MongooseError = require('../error/mongooseError'); /*! * ignore @@ -25,6 +26,7 @@ class ChangeStream extends EventEmitter { this.bindedEvents = false; this.pipeline = pipeline; this.options = options; + this.errored = false; if (options && options.hydrate && !options.model) { throw new Error( @@ -33,19 +35,36 @@ class ChangeStream extends EventEmitter { ); } + let syncError = null; this.$driverChangeStreamPromise = new Promise((resolve, reject) => { // This wrapper is necessary because of buffering. - changeStreamThunk((err, driverChangeStream) => { - if (err != null) { - this.emit('error', err); - return reject(err); - } + try { + changeStreamThunk((err, driverChangeStream) => { + if (err != null) { + this.errored = true; + this.emit('error', err); + return reject(err); + } - this.driverChangeStream = driverChangeStream; - this.emit('ready'); - resolve(); - }); + this.driverChangeStream = driverChangeStream; + this.emit('ready'); + resolve(); + }); + } catch (err) { + syncError = err; + this.errored = true; + this.emit('error', err); + reject(err); + } }); + + // Because a ChangeStream is an event emitter, there's no way to register an 'error' handler + // that catches errors which occur in the constructor, unless we force sync errors into async + // errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous + // errors that occurred with changeStreamThunk(). + if (syncError != null) { + throw syncError; + } } _bindEvents() { @@ -92,10 +111,16 @@ class ChangeStream extends EventEmitter { } hasNext(cb) { + if (this.errored) { + throw new MongooseError('Cannot call hasNext() on errored ChangeStream'); + } return this.driverChangeStream.hasNext(cb); } next(cb) { + if (this.errored) { + throw new MongooseError('Cannot call next() on errored ChangeStream'); + } if (this.options && this.options.hydrate) { if (cb != null) { const originalCb = cb; @@ -126,16 +151,25 @@ class ChangeStream extends EventEmitter { } addListener(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call addListener() on errored ChangeStream'); + } this._bindEvents(); return super.addListener(event, handler); } on(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call on() on errored ChangeStream'); + } this._bindEvents(); return super.on(event, handler); } once(event, handler) { + if (this.errored) { + throw new MongooseError('Cannot call once() on errored ChangeStream'); + } this._bindEvents(); return super.once(event, handler); }