Skip to content

Commit

Permalink
refactor(NODE-5063): make DestroyOptions required on connection.destr…
Browse files Browse the repository at this point in the history
…oy (#3568)
  • Loading branch information
baileympearson authored Feb 14, 2023
1 parent 9ce0bcc commit 666f01c
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function performInitialHandshake(
) {
const callback: Callback<Document> = function (err, ret) {
if (err && conn) {
conn.destroy();
conn.destroy({ force: false });
}
_callback(err, ret);
};
Expand Down
10 changes: 2 additions & 8 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export interface ConnectionOptions
/** @internal */
export interface DestroyOptions {
/** Force the destruction. */
force?: boolean;
force: boolean;
}

/** @public */
Expand Down Expand Up @@ -443,16 +443,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(undefined, message.documents[0]);
}

destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}

destroy(options: DestroyOptions, callback?: Callback): void {
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);

options = Object.assign({ force: false }, options);
if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy(options, cb);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
Expand Down Expand Up @@ -586,7 +586,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy());
process.nextTick(() => connection.destroy({ force: false }));
}

private connectionIsStale(connection: Connection) {
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') (callback = options), (options = {});
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
options = Object.assign({}, { force: false }, options);

if (this.s.state === STATE_CLOSED) {
Expand Down
17 changes: 4 additions & 13 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,26 +466,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(callback: Callback): void;
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = {};
}

if (typeof options === 'boolean') {
options = { force: options };
}
options = options ?? {};
close(options?: CloseOptions, callback?: Callback): void {
options = options ?? { force: false };

if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return callback?.();
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, options as CloseOptions);
return promisify(destroyServer)(server, this, { force: !!options?.force });
});

Promise.all(destroyedServers)
Expand Down Expand Up @@ -740,7 +731,7 @@ function destroyServer(
options?: DestroyOptions,
callback?: Callback
) {
options = options ?? {};
options = options ?? { force: false };
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}
Expand Down
70 changes: 20 additions & 50 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const sinon = require('sinon');
const { Writable } = require('stream');
const { once, on } = require('events');
const { setTimeout } = require('timers');
const { ReadPreference } = require('../../mongodb');
const { ReadPreference, MongoExpiredSessionError } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { formatSort } = require('../../mongodb');
const { getSymbolFrom } = require('../../tools/utils');
Expand Down Expand Up @@ -1852,61 +1852,31 @@ describe('Cursor', function () {
}
});

it('should close dead tailable cursors', {
metadata: {
os: '!win32' // NODE-2943: timeout on windows
},

test: function (done) {
// http://www.mongodb.org/display/DOCS/Tailable+Cursors

const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const db = client.db(configuration.db);
const options = { capped: true, size: 10000000 };
db.createCollection(
'test_if_dead_tailable_cursors_close',
options,
function (err, collection) {
expect(err).to.not.exist;
it('closes cursors when client is closed even if it has not been exhausted', async function () {
await client
.db()
.dropCollection('test_cleanup_tailable')
.catch(() => null);

let closeCount = 0;
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => {
expect(err).to.not.exist;

const cursor = collection.find({}, { tailable: true, awaitData: true });
const stream = cursor.stream();
const collection = await client
.db()
.createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 });

stream.resume();

var validator = () => {
closeCount++;
if (closeCount === 2) {
done();
}
};
// insert only 2 docs in capped coll of 3
await collection.insertMany([{ a: 1 }, { a: 1 }]);

// we validate that the stream "ends" either cleanly or with an error
stream.on('end', validator);
stream.on('error', validator);
const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });

cursor.on('close', validator);
await cursor.next();
await cursor.next();
// will block for maxAwaitTimeMS (except we are closing the client)
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);

const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, err => {
expect(err).to.not.exist;
await client.close();
expect(cursor).to.have.property('killed', true);

setTimeout(() => client.close());
});
});
}
);
});
}
const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
});

it('shouldAwaitData', {
Expand Down
18 changes: 13 additions & 5 deletions test/integration/node-specific/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ describe('Topology', function () {
const states = [];
topology.on('stateChanged', (_, newState) => states.push(newState));
topology.connect(err => {
expect(err).to.not.exist;
topology.close(err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
topology.close({}, err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {

afterEach(function (done) {
if (context.topology) {
context.topology.close(done);
context.topology.close({}, done);
} else {
done();
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/assorted/server_selection_spec_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) {
});

function done(err) {
topology.close(e => testDone(e || err));
topology.close({}, e => testDone(e || err));
}

topology.connect(err => {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ describe('MongoErrors', () => {

makeAndConnectReplSet((err, topology) => {
// cleanup the server before calling done
const cleanup = err => topology.close(err2 => done(err || err2));
const cleanup = err => topology.close({}, err2 => done(err || err2));

if (err) {
return cleanup(err);
Expand Down
4 changes: 2 additions & 2 deletions test/unit/sdam/monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('monitoring', function () {
const serverDescription = Array.from(topology.description.servers.values())[0];
expect(serverDescription).property('roundTripTime').to.be.greaterThan(0);

topology.close(done as any);
topology.close({}, done as any);
}, 500);
});
}).skipReason = 'TODO(NODE-3819): Unskip flaky tests';
Expand Down Expand Up @@ -96,7 +96,7 @@ describe('monitoring', function () {
const serverDescription = Array.from(topology.description.servers.values())[0];
expect(serverDescription).property('roundTripTime').to.be.greaterThan(0);

topology.close(done);
topology.close({}, done);
});
}).skipReason = 'TODO(NODE-3600): Unskip flaky tests';

Expand Down
18 changes: 9 additions & 9 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('Topology (unit)', function () {
}

if (topology) {
topology.close();
topology.close({});
}
});

Expand Down Expand Up @@ -107,7 +107,7 @@ describe('Topology (unit)', function () {

topology.connect(() => {
expect(topology.shouldCheckForSessionSupport()).to.be.true;
topology.close(done);
topology.close({}, done);
});
});

Expand All @@ -127,7 +127,7 @@ describe('Topology (unit)', function () {

topology.connect(() => {
expect(topology.shouldCheckForSessionSupport()).to.be.false;
topology.close(done);
topology.close({}, done);
});
});

Expand All @@ -147,7 +147,7 @@ describe('Topology (unit)', function () {

topology.connect(() => {
expect(topology.shouldCheckForSessionSupport()).to.be.false;
topology.close(done);
topology.close({}, done);
});
});
});
Expand Down Expand Up @@ -182,7 +182,7 @@ describe('Topology (unit)', function () {
expect(err).to.exist;
expect(err).to.match(/timed out/);

topology.close(done);
topology.close({}, done);
});
});
});
Expand Down Expand Up @@ -325,7 +325,7 @@ describe('Topology (unit)', function () {
expect(err).to.exist;
expect(err).to.eql(serverDescription.error);
expect(poolCleared).to.be.false;
topology.close(done);
topology.close({}, done);
});
});
});
Expand Down Expand Up @@ -467,7 +467,7 @@ describe('Topology (unit)', function () {

it('should clean up listeners on close', function (done) {
topology.s.state = 'connected'; // fake state to test clean up logic
topology.close(e => {
topology.close({}, e => {
const srvPollerListeners = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
Expand Down Expand Up @@ -547,7 +547,7 @@ describe('Topology (unit)', function () {
// occurs `requestCheck` will be called for an immediate check.
expect(requestCheck).property('callCount').to.equal(1);

topology.close(done);
topology.close({}, done);
});
});
});
Expand All @@ -559,7 +559,7 @@ describe('Topology (unit)', function () {
this.emit('connect');
});

topology.close(() => {
topology.close({}, () => {
topology.selectServer(ReadPreference.primary, { serverSelectionTimeoutMS: 2000 }, err => {
expect(err).to.exist;
expect(err).to.match(/Topology is closed/);
Expand Down

0 comments on commit 666f01c

Please sign in to comment.