Skip to content

Commit

Permalink
refactor(NODE-5675): refactor server selection and connection checkou…
Browse files Browse the repository at this point in the history
…t to use abort signals for timeout management (#3890)
  • Loading branch information
baileympearson authored Oct 17, 2023
1 parent 4ff8080 commit db90293
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 57 deletions.
56 changes: 26 additions & 30 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { type Callback, eachAsync, List, makeCounter } from '../utils';
import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils';
import { AUTH_PROVIDERS, connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -101,7 +101,7 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
/** @internal */
export interface WaitQueueMember {
callback: Callback<Connection>;
timer?: NodeJS.Timeout;
timeoutController: TimeoutController;
[kCancelled]?: boolean;
}

Expand Down Expand Up @@ -356,27 +356,29 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueMember: WaitQueueMember = { callback };
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
if (waitQueueTimeoutMS) {
waitQueueMember.timer = setTimeout(() => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
}, waitQueueTimeoutMS);
}
const waitQueueMember: WaitQueueMember = {
callback,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();

this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
: 'Timed out while checking out a connection from connection pool',
this.address
)
);
});

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());
Expand Down Expand Up @@ -831,9 +833,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();
this[kWaitQueue].shift();
waitQueueMember.callback(error);
continue;
Expand All @@ -854,9 +854,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

this[kWaitQueue].shift();
waitQueueMember.callback(undefined, connection);
Expand Down Expand Up @@ -893,9 +891,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();
waitQueueMember.callback(err, connection);
}
process.nextTick(() => this.processWaitQueue());
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ export type {
HostAddress,
List,
MongoDBCollectionNamespace,
MongoDBNamespace
MongoDBNamespace,
TimeoutController
} from './utils';
export type { W, WriteConcernOptions, WriteConcernSettings } from './write_concern';
42 changes: 17 additions & 25 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

import type { BSONSerializeOptions, Document } from '../bson';
Expand Down Expand Up @@ -43,7 +42,8 @@ import {
List,
makeStateMachine,
ns,
shuffle
shuffle,
TimeoutController
} from '../utils';
import {
_advanceClusterTime,
Expand Down Expand Up @@ -94,8 +94,8 @@ export interface ServerSelectionRequest {
serverSelector: ServerSelector;
transaction?: Transaction;
callback: ServerSelectionCallback;
timer?: NodeJS.Timeout;
[kCancelled]?: boolean;
timeoutController: TimeoutController;
}

/** @internal */
Expand Down Expand Up @@ -556,22 +556,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
const waitQueueMember: ServerSelectionRequest = {
serverSelector,
transaction,
callback
callback,
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS)
};

const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
if (serverSelectionTimeoutMS) {
waitQueueMember.timer = setTimeout(() => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${serverSelectionTimeoutMS} ms`,
this.description
);
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
waitQueueMember[kCancelled] = true;
waitQueueMember.timeoutController.clear();
const timeoutError = new MongoServerSelectionError(
`Server selection timed out after ${options.serverSelectionTimeoutMS} ms`,
this.description
);

waitQueueMember.callback(timeoutError);
}, serverSelectionTimeoutMS);
}
waitQueueMember.callback(timeoutError);
});

this[kWaitQueue].push(waitQueueMember);
processWaitQueue(this);
Expand Down Expand Up @@ -842,9 +840,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr
continue;
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

if (!waitQueueMember[kCancelled]) {
waitQueueMember.callback(err);
Expand Down Expand Up @@ -878,9 +874,7 @@ function processWaitQueue(topology: Topology) {
? serverSelector(topology.description, serverDescriptions)
: serverDescriptions;
} catch (e) {
if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

waitQueueMember.callback(e);
continue;
Expand Down Expand Up @@ -917,9 +911,7 @@ function processWaitQueue(topology: Topology) {
transaction.pinServer(selectedServer);
}

if (waitQueueMember.timer) {
clearTimeout(waitQueueMember.timer);
}
waitQueueMember.timeoutController.clear();

waitQueueMember.callback(undefined, selectedServer);
}
Expand Down
28 changes: 28 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as crypto from 'crypto';
import type { SrvRecord } from 'dns';
import * as http from 'http';
import { clearTimeout, setTimeout } from 'timers';
import * as url from 'url';
import { URL } from 'url';

Expand Down Expand Up @@ -1254,3 +1255,30 @@ export async function request(
req.end();
});
}

/**
* A custom AbortController that aborts after a specified timeout.
*
* If `timeout` is undefined or \<=0, the abort controller never aborts.
*
* This class provides two benefits over the built-in AbortSignal.timeout() method.
* - This class provides a mechanism for cancelling the timeout
* - This class supports infinite timeouts by interpreting a timeout of 0 as infinite. This is
* consistent with existing timeout options in the Node driver (serverSelectionTimeoutMS, for example).
* @internal
*/
export class TimeoutController extends AbortController {
constructor(
timeout = 0,
private timeoutId = timeout > 0 ? setTimeout(() => this.abort(), timeout) : null
) {
super();
}

clear() {
if (this.timeoutId != null) {
clearTimeout(this.timeoutId);
}
this.timeoutId = null;
}
}
66 changes: 65 additions & 1 deletion test/unit/utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import * as sinon from 'sinon';

import {
BufferPool,
Expand All @@ -16,8 +17,10 @@ import {
MongoDBNamespace,
MongoRuntimeError,
ObjectId,
shuffle
shuffle,
TimeoutController
} from '../mongodb';
import { createTimerSandbox } from './timer_sandbox';

describe('driver utils', function () {
describe('.hostMatchesWildcards', function () {
Expand Down Expand Up @@ -1101,4 +1104,65 @@ describe('driver utils', function () {
});
});
});

describe('class TimeoutController', () => {
let timerSandbox, clock, spy;

beforeEach(function () {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
spy = sinon.spy();
});

afterEach(function () {
clock.restore();
timerSandbox.restore();
});

describe('constructor', () => {
it('when no timeout is provided, it creates an infinite timeout', () => {
const controller = new TimeoutController();
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

it('when timeout is 0, it creates an infinite timeout', () => {
const controller = new TimeoutController(0);
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

it('when timeout <0, it creates an infinite timeout', () => {
const controller = new TimeoutController(-5);
// @ts-expect-error Accessing a private field on TimeoutController
expect(controller.timeoutId).to.be.null;
});

context('when timeout > 0', () => {
let timeoutController: TimeoutController;

beforeEach(function () {
timeoutController = new TimeoutController(3000);
timeoutController.signal.addEventListener('abort', spy);
});

afterEach(function () {
timeoutController.clear();
});

it('it creates a timeout', () => {
// @ts-expect-error Accessing a private field on TimeoutController
expect(timeoutController.timeoutId).not.to.be.null;
});

it('times out after `timeout` milliseconds', () => {
expect(spy, 'spy was called after creation').not.to.have.been.called;
clock.tick(2999);
expect(spy, 'spy was called before 3000ms has expired').not.to.have.been.called;
clock.tick(1);
expect(spy, 'spy was not called after 3000ms').to.have.been.called;
});
});
});
});
});

0 comments on commit db90293

Please sign in to comment.