Skip to content

Commit

Permalink
Add pool.kill
Browse files Browse the repository at this point in the history
  • Loading branch information
cressie176 committed Aug 26, 2023
1 parent a24bdb6 commit 046eb49
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 151 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ await pool.shutdown();

Shuts down the pool. After calling shutdown any inflight acquisition requests will be allowed to continue but new requests will be rejected. Once there are no inflight requests the remaining idle resources will be destroyed. The method blocks until all resources have been destroyed or until the shutdownTimeout expires. Calling shutdown repeatedly will yield an error.

### kill() : void

```js
await pool.kill();
```

Intended to assist unit testing. Aggresively kills the pool - any queued or in progress acquisition requests will be immediately rejected. No resources will be destroyed. All event listeners will be removed.

## Resource Management

### Revalidation
Expand Down
5 changes: 5 additions & 0 deletions lib/Errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class OperationTimedout extends XPoolError {
static code = 'ERR_X-POOL_OPERATION_TIMEDOUT';
}

class OperationAborted extends XPoolError {
static code = 'ERR_X-POOL_OPERATION_ABORTED';
}

class PoolNotRunning extends XPoolError {
static code = 'ERR_X-POOL_NOT_RUNNING';
}
Expand All @@ -43,6 +47,7 @@ module.exports = {
XPoolError,
ConfigurationError,
OperationTimedout,
OperationAborted,
PoolNotRunning,
MaxQueueDepthExceeded,
ResourceCreationFailed,
Expand Down
24 changes: 18 additions & 6 deletions lib/Operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,11 @@ class XPoolOperation {
}

_emitEvent(code, payload) {
setImmediate(() => {
this._pool.emit(code, payload) | this._pool.emit(XPoolEvent, payload);
});
this._pool.emit(code, payload) | this._pool.emit(XPoolEvent, payload);
}

_emitError(code, payload) {
setImmediate(() => {
this._pool.emit(code, payload) | this._pool.emit(XPoolError, payload) | this._pool.emit(XPoolEvent, payload);
});
this._pool.emit(code, payload) | this._pool.emit(XPoolError, payload) | this._pool.emit(XPoolEvent, payload);
}
}

Expand Down Expand Up @@ -113,6 +109,21 @@ class ShutdownPoolOperation extends XPoolOperation {
}
}

class KillPoolOperation extends XPoolOperation {

static STARTED = 'X-POOL_KILL_POOL_STARTED';
static NOTICE = 'X-POOL_KILL_POOL_NOTICE';
static SUCCEEDED = 'X-POOL_KILL_POOL_SUCCEEDED';
static FAILED = 'X-POOL_KILL_POOL_FAILED';

constructor(pool) {
super(pool, {
started: ({ contextId }) => `[${contextId}] Killing pool`,
succeeded: ({ contextId, duration }) => `[${contextId}] Killed pool in ${duration}ms`,
});
}
}

class AcquireResourceOperation extends XPoolOperation {

static STARTED = 'X-POOL_ACQUIRE_RESOURCE_STARTED';
Expand Down Expand Up @@ -238,6 +249,7 @@ module.exports = {
XPoolOperation,
InitialisePoolOperation,
ShutdownPoolOperation,
KillPoolOperation,
AcquireResourceOperation,
CreateResourceOperation,
ValidateResourceOperation,
Expand Down
70 changes: 55 additions & 15 deletions lib/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ const { scheduler } = require('node:timers/promises');
const TimedTask = require('./TimedTask');
const State = require('./State');
const { validateFactory, validateBoolean, validateNumber } = require('./validation');
const { ResourceCreationFailed, ResourceValidationFailed, ResourceDestructionFailed, PoolNotRunning } = require('./Errors');
const { XPoolOperation, InitialisePoolOperation, AcquireResourceOperation, CreateResourceOperation, ValidateResourceOperation, ReleaseResourceOperation, WithResourceOperation, DestroyResourceOperation, EvictBadResourcesOperation, ShutdownPoolOperation, DestroySpareResourcesOperation } = require('./Operations');
const { ResourceCreationFailed, ResourceValidationFailed, ResourceDestructionFailed, PoolNotRunning, OperationAborted } = require('./Errors');
const { XPoolOperation, InitialisePoolOperation, AcquireResourceOperation, CreateResourceOperation, ValidateResourceOperation, ReleaseResourceOperation, WithResourceOperation, DestroyResourceOperation, EvictBadResourcesOperation, ShutdownPoolOperation, DestroySpareResourcesOperation, KillPoolOperation } = require('./Operations');

const DEFAULT_AUTO_START = false;
const DEFAULT_ACQUIRE_RETRY_INTERVAL = 100;
Expand Down Expand Up @@ -40,7 +40,7 @@ module.exports = class Pool extends EventEmitter {
}

async _initialiseWithoutTimeout(size) {
const task = { isAborted: () => false };
const task = { isOK: () => true };
return this._batchAquire(task, size);
}

Expand Down Expand Up @@ -107,8 +107,47 @@ module.exports = class Pool extends EventEmitter {
});
}

async kill() {
return new KillPoolOperation(this).run(async (op) => {
this._stopping = true;
this._rejectAcquireRequests(op);
this._abortAcquireTasks(op);
this._abortDestroyTasks(op);
this._state.nuke();
this.removeAllListeners();
});
}

_rejectAcquireRequests(op) {
const { queued } = this.stats();
op.notice(`Rejecting ${queued} queued acquire requests`);
new Array(queued).fill().forEach(() => {
const err = new OperationAborted('Acquire request aborted. The pool has been killed');
const { reject } = this._state.dequeueAcquireRequest();
reject(err);
});
}

_abortAcquireTasks(op) {
const { acquiring } = this.stats();
op.notice(`Aborting ${acquiring} acquire tasks`);
new Array(acquiring).fill().forEach(() => {
const task = this._state.dequeueAcquireTask();
task.abort();
});
}

_abortDestroyTasks(op) {
const { destroying } = this.stats();
op.notice(`Aborting ${destroying} destroy tasks`);
new Array(destroying).fill().forEach(() => {
const task = this._state.dequeueDestroyTask();
task.abort();
});
}

_assertRunning() {
if (this._stopping) throw new PoolNotRunning('The pool has been shutdown');
if (this._stopping) throw new PoolNotRunning('The pool is not running');
}

_createInitialiseTask(size) {
Expand All @@ -119,7 +158,7 @@ module.exports = class Pool extends EventEmitter {
async _batchAquire(task, size) {
const acquireResources = new Array(size).fill().map(async () => {
let resource;
while (!resource && !task.isAborted()) {
while (!resource && task.isOK()) {
try {
resource = await this.acquire();
} catch (err) {
Expand All @@ -137,12 +176,12 @@ module.exports = class Pool extends EventEmitter {
this._checkAcquireQueue(op);
}).then(() => this._acquireResource(op, task));

const onLateResolve = (resource) => {
const onLateResolve = (task, resource) => {
op.notice('Adding resource acquired after timeout');
this._state.addLateAcquiredResource(resource);
this._state.addLateAcquiredResource(resource, task);
};

return new TimedTask({ name: 'acquire', fn, timeout: this._acquireTimeout, onLateResolve });
return new TimedTask({ name: 'Acquire', fn, timeout: this._acquireTimeout, onLateResolve });
}

_queueAcquireRequest(request) {
Expand All @@ -158,15 +197,16 @@ module.exports = class Pool extends EventEmitter {
}

async _acquireResource(op, task) {
this._state.commenceAcquisition(task);
let resource;
while (!resource && !task.isAborted()) {
while (!resource && task.isOK()) {
resource = await this._obtainValidResource();
if (!resource) {
op.notice(`Retrying in ${this._acquireRetryInterval}ms`);
await this._delay(this._acquireRetryInterval);
}
}
if (!task.isAborted()) this._state.addAcquiredResource(resource);
if (task.isOK()) this._state.addAcquiredResource(resource, task);
return resource;
}

Expand Down Expand Up @@ -206,13 +246,13 @@ module.exports = class Pool extends EventEmitter {
}

async _destroyResource(op, resource) {
const task = this._createDestroyTask(op, resource);
try {
this._state.commenceDestruction(resource);
const task = this._createDestroyTask(op, resource);
this._state.commenceDestruction(task);
await task.execute();
this._state.completeDestruction(resource);
this._state.completeDestruction(task);
} catch (err) {
this._state.excludeBadResource(resource);
this._state.excludeBadResource(resource, task);
op.failed(err).end();
}
}
Expand All @@ -226,8 +266,8 @@ module.exports = class Pool extends EventEmitter {
}
};
const onLateResolve = () => {
op.notice('Discarding resource after timeout');
this._state.evictBadResource(resource);
op.notice('Discarded resource after timeout');
};
return new TimedTask({ name: 'Destroy', fn, timeout: this._destroyTimeout, onLateResolve });
}
Expand Down
64 changes: 45 additions & 19 deletions lib/State.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
const { validateNumber, validateUpperBoundary } = require('./validation');
const { MaxQueueDepthExceeded } = require('./Errors');

const PLACEHOLDER = { abort: () => {} };

module.exports = class State {

constructor(options) {
Expand All @@ -11,7 +13,7 @@ module.exports = class State {
this._maxQueueDepth = validateNumber('maxQueueDepth', options, false, 1) || Infinity;

this._queued = [];
this._acquiringCount = 0;
this._acquiring = [];
this._acquired = [];
this._idle = [];
this._destroying = [];
Expand All @@ -36,7 +38,7 @@ module.exports = class State {
}

get acquiring() {
return this._acquiringCount;
return this._acquiring.length;
}

get acquired() {
Expand All @@ -60,7 +62,7 @@ module.exports = class State {
}

get spare() {
return Math.max(0, this.idle - this.queued - this.acquiring);
return Math.max(0, this.idle - this.queued - this.acquiring - this.destroying);
}

get available() {
Expand All @@ -81,11 +83,16 @@ module.exports = class State {
}

dequeueAcquireRequest() {
this._acquiringCount++;
this._peakCount = Math.max(this._peakCount, this.size);
this._acquiring.push(PLACEHOLDER);
return this._queued.shift();
}

commenceAcquisition(task) {
this._removeItem(this._acquiring, PLACEHOLDER);
this._acquiring.push(task);
this._peakCount = Math.max(this._peakCount, this.size);
}

isEmpty() {
return this.queued + this.acquiring + this.acquired + this.destroying + this.idle + this.bad === 0;
}
Expand All @@ -102,45 +109,64 @@ module.exports = class State {
return this._idle.shift();
}

addAcquiredResource(resource) {
this._acquiringCount--;
addAcquiredResource(resource, task) {
this._removeItem(this._acquiring, task);
this._acquired.push(resource);
}

addLateAcquiredResource(resource) {
this._acquiringCount--;
addLateAcquiredResource(resource, task) {
this._removeItem(this._acquiring, task);
this._idle.push(resource);
}

dequeueAcquireTask() {
return this._acquiring.shift();
}

releaseAcquiredResource(resource) {
if (this._removeResource(this._acquired, resource)) this._idle.push(resource);
if (this._removeItem(this._acquired, resource)) this._idle.push(resource);
}

removeAcquiredResource(resource) {
this._removeResource(this._acquired, resource);
this._removeItem(this._acquired, resource);
}

commenceDestruction(task) {
this._destroying.push(task);
}

commenceDestruction(resource) {
this._destroying.push(resource);
completeDestruction(task) {
this._removeItem(this._destroying, task);
}

completeDestruction(resource) {
this._removeResource(this._destroying, resource);
dequeueDestroyTask() {
return this._destroying.shift();
}

excludeBadResource(resource) {
this._removeResource(this._destroying, resource);
excludeBadResource(resource, task) {
this._removeItem(this._destroying, task);
this._bad.push(resource);
}

evictBadResource(resource) {
this._removeResource(this._bad, resource);
this._removeItem(this._bad, resource);
}

evictBadResources() {
this._bad.length = 0;
}

nuke() {
this._maxSize = 0;
this._minSize = 0;
this._queued.length = 0;
this._acquiring.length = 0;
this._acquired.length = 0;
this._idle.length = 0;
this._destroying.length = 0;
this._bad.length = 0;
}

stats() {
return {
queued: this.queued,
Expand All @@ -155,7 +181,7 @@ module.exports = class State {
};
}

_removeResource(list, resource) {
_removeItem(list, resource) {
const index = list.indexOf(resource);
if (index < 0) return false;

Expand Down
Loading

0 comments on commit 046eb49

Please sign in to comment.