Skip to content

Commit

Permalink
feat: introduce an interruptable async interval timer
Browse files Browse the repository at this point in the history
This timer calls an async function on an interval, optionally
allowing the interval to be interrupted and execution to occur
sooner. Calls to interrupt the interval are debounced such that
only the first call to wake the timer is honored.
  • Loading branch information
mbroadst committed Jun 11, 2020
1 parent 7cab088 commit 21cbabd
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 4 deletions.
88 changes: 87 additions & 1 deletion lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ function maybePromise(callback, wrapper) {
function databaseNamespace(ns) {
return ns.split('.')[0];
}

function collectionNamespace(ns) {
return ns
.split('.')
Expand Down Expand Up @@ -915,6 +916,89 @@ function emitDeprecatedOptionWarning(options, list) {
});
}

function now() {
const hrtime = process.hrtime();
return Math.floor(hrtime[0] * 1000 + hrtime[1] / 1000000);
}

/**
* Creates an interval timer which is able to be woken up sooner than
* the interval. The timer will also debounce multiple calls to wake
* ensuring that the function is only ever called once within a minimum
* interval window.
*
* @param {function} fn An async function to run on an interval, must accept a `callback` as its only parameter
* @param {object} [options] Optional settings
* @param {number} [options.interval] The interval at which to run the provided function
* @param {number} [options.minInterval] The minimum time which must pass between invocations of the provided function
* @param {boolean} [options.immediate] Execute the function immediately when the interval is started
*/
function makeInterruptableAsyncInterval(fn, options) {
let timerId;
let lastCallTime;
let lastWakeTime;
let stopped = false;

options = options || {};
const interval = options.interval || 1000;
const minInterval = options.minInterval || 500;
const immediate = typeof options.immediate === 'boolean' ? options.immediate : false;

function wake() {
const currentTime = now();
const timeSinceLastWake = currentTime - lastWakeTime;
const timeSinceLastCall = currentTime - lastCallTime;
const timeUntilNextCall = Math.max(interval - timeSinceLastCall, 0);
lastWakeTime = currentTime;

// debounce multiple calls to wake within the `minInterval`
if (timeSinceLastWake < minInterval) {
return;
}

// reschedule a call as soon as possible, ensuring the call never happens
// faster than the `minInterval`
if (timeUntilNextCall > minInterval) {
reschedule(minInterval);
}
}

function stop() {
stopped = true;
if (timerId) {
clearTimeout(timerId);
timerId = null;
}

lastCallTime = 0;
lastWakeTime = 0;
}

function reschedule(ms) {
if (stopped) return;
clearTimeout(timerId);
timerId = setTimeout(executeAndReschedule, ms || interval);
}

function executeAndReschedule() {
lastWakeTime = 0;
lastCallTime = now();
fn(err => {
if (err) throw err;
reschedule(interval);
});
}

if (immediate) {
executeAndReschedule();
} else {
lastCallTime = now();
reschedule();
}

return { wake, stop };
}

module.exports = {
filterOptions,
mergeOptions,
Expand Down Expand Up @@ -957,5 +1041,7 @@ module.exports = {
errorStrictEqual,
makeStateMachine,
makeClientMetadata,
noop
noop,
now,
makeInterruptableAsyncInterval
};
78 changes: 75 additions & 3 deletions test/unit/utils.test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict';
const { eachAsync } = require('../../lib/utils');
const expect = require('chai').expect;
const { eachAsync, now, makeInterruptableAsyncInterval } = require('../../lib/utils');
const { expect } = require('chai');

describe('utils', function() {
describe('eachAsync', function() {
context('eachAsync', function() {
it('should callback with an error', function(done) {
eachAsync(
[{ error: false }, { error: true }],
Expand Down Expand Up @@ -33,4 +33,76 @@ describe('utils', function() {
done();
});
});

context('makeInterruptableAsyncInterval', function() {
const roundToNearestMultipleOfTen = x => Math.floor(x / 10) * 10;

it('should execute a method in an repeating interval', function(done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
callback();
},
{ interval: 10 }
);

setTimeout(() => {
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
expect(roundedMarks.every(mark => roundedMarks[0] === mark)).to.be.true;
executor.stop();
done();
}, 50);
});

it('should schedule execution sooner if requested within min interval threshold', function(done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
callback();
},
{ interval: 50, minInterval: 10 }
);

// immediately schedule execution
executor.wake();

setTimeout(() => {
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
expect(roundedMarks[0]).to.equal(10);
executor.stop();
done();
}, 50);
});

it('should debounce multiple requests to wake the interval sooner', function(done) {
let lastTime = now();
const marks = [];
const executor = makeInterruptableAsyncInterval(
callback => {
marks.push(now() - lastTime);
lastTime = now();
callback();
},
{ interval: 50, minInterval: 10 }
);

for (let i = 0; i < 100; ++i) {
executor.wake();
}

setTimeout(() => {
const roundedMarks = marks.map(roundToNearestMultipleOfTen);
expect(roundedMarks[0]).to.equal(10);
expect(roundedMarks.slice(1).every(mark => mark === 50)).to.be.true;
executor.stop();
done();
}, 250);
});
});
});

0 comments on commit 21cbabd

Please sign in to comment.