Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6008): deprecate CloseOptions interface #4030

Merged
merged 21 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor(NODE-5915): refactor topology close logic to be synchronous
  • Loading branch information
W-A-James committed Mar 5, 2024
commit c105a58572a492c4cf71683f9344f87327541767
6 changes: 1 addition & 5 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}, 1).unref(); // No need for this timer to hold the event loop open
}

public destroy(options: DestroyOptions, callback?: Callback): void {
public destroy(): void {
if (this.closed) {
if (typeof callback === 'function') process.nextTick(callback);
return;
}
if (typeof callback === 'function') {
this.once('close', () => process.nextTick(() => callback()));
}

// load balanced mode requires that these listeners remain on the connection
// after cleanup on timeouts, errors or close so we remove them before calling
Expand Down
36 changes: 13 additions & 23 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import {
type Callback,
eachAsync,
List,
makeCounter,
promiseWithResolvers,
Expand Down Expand Up @@ -501,18 +500,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

/** Close the pool */
close(callback: Callback<void>): void;
close(options: CloseOptions, callback: Callback<void>): void;
close(_options?: CloseOptions | Callback<void>, _cb?: Callback<void>): void {
close(_options?: CloseOptions): void {
let options = _options as CloseOptions;
const callback = (_cb ?? _options) as Callback<void>;
if (typeof options === 'function') {
options = {};
}

options = Object.assign({ force: false }, options);
if (this.closed) {
return callback();
return;
}

// immediately cancel any in-flight connections
Expand All @@ -527,21 +523,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this.clearMinPoolSizeTimer();
this.processWaitQueue();

eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
callback(err);
}
);
for (const conn of this[kConnections]) {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy();
}
this[kConnections].clear();
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
}

/**
Expand Down Expand Up @@ -593,7 +583,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy({ force: false }));
process.nextTick(() => connection.destroy());
}

private connectionIsStale(connection: Connection) {
Expand Down Expand Up @@ -649,7 +639,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// The pool might have closed since we started trying to create a connection
if (this[kPoolState] !== PoolState.ready) {
this[kPending]--;
connection.destroy({ force: true });
connection.destroy();
callback(this.closed ? new PoolClosedError(this) : new PoolClearedError(this));
return;
}
Expand Down
21 changes: 4 additions & 17 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import type { GetMoreOptions } from '../operations/get_more';
import type { ClientSession } from '../sessions';
import { isTransactionCommand } from '../transactions';
import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
maxWireVersion,
Expand Down Expand Up @@ -236,18 +235,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
destroy(options?: DestroyOptions): void {
options = Object.assign({}, { force: false }, options);

if (this.s.state === STATE_CLOSED) {
if (typeof callback === 'function') {
callback();
}

return;
}

Expand All @@ -257,13 +248,9 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this.monitor?.close();
}

this.pool.close(options, err => {
stateTransition(this, STATE_CLOSED);
this.emit('closed');
if (typeof callback === 'function') {
callback(err);
}
});
this.pool.close(options);
stateTransition(this, STATE_CLOSED);
this.emit('closed');
}

/**
Expand Down
40 changes: 17 additions & 23 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,41 +494,35 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions, callback?: Callback): void {
close(options?: CloseOptions): void {
options = options ?? { force: false };

if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return callback?.();
return;
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, { force: !!options?.force });
});
for (const server of this.s.servers.values()) {
destroyServer(server, this, { force: !!options?.force });
}

Promise.all(destroyedServers)
.then(() => {
this.s.servers.clear();
this.s.servers.clear();

stateTransition(this, STATE_CLOSING);
stateTransition(this, STATE_CLOSING);

drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError());
drainTimerQueue(this.s.connectionTimers);
drainWaitQueue(this[kWaitQueue], new MongoTopologyClosedError());
drainTimerQueue(this.s.connectionTimers);

if (this.s.srvPoller) {
this.s.srvPoller.stop();
this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
}
if (this.s.srvPoller) {
this.s.srvPoller.stop();
this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
}

this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);

stateTransition(this, STATE_CLOSED);
stateTransition(this, STATE_CLOSED);

// emit an event for close
this.emitAndLog(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id));
})
.finally(() => callback?.());
// emit an event for close
this.emitAndLog(Topology.TOPOLOGY_CLOSED, new TopologyClosedEvent(this.s.id));
}

/**
Expand Down
Loading