Skip to content

Commit

Permalink
Add tests for lib/fan-out-consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Jun 4, 2019
1 parent ef4e970 commit 8af3763
Show file tree
Hide file tree
Showing 5 changed files with 618 additions and 11 deletions.
14 changes: 14 additions & 0 deletions lib/__mocks__/aws-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ function createResponseMock(data = {}) {
});
}

const resolvePromise = jest.fn(() =>
Promise.resolve({
accessKeyId: 'resolved-access-key-id',
secretAccessKey: 'resolved-secret-access-key',
sessionToken: 'resolved-session-token'
})
);

const CredentialProviderChain = jest.fn(() => ({ resolvePromise }));

const createTable = createResponseMock();
const deleteMock = createResponseMock();
const describeTable = createResponseMock();
Expand Down Expand Up @@ -68,6 +78,9 @@ const Kinesis = jest.fn(({ endpoint } = {}) => {
});

function mockClear() {
resolvePromise.mockClear();
CredentialProviderChain.mockClear();

createTable.mockClear();
deleteMock.mockClear();
describeTable.mockClear();
Expand Down Expand Up @@ -98,6 +111,7 @@ function mockClear() {
}

module.exports = {
CredentialProviderChain,
DynamoDB,
Kinesis,
mockClear
Expand Down
39 changes: 39 additions & 0 deletions lib/__mocks__/got.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const { Transform } = require('stream');

let hooks;
let response;

const abort = jest.fn();

const stream = jest.fn(() => {
response = new Transform({ objectMode: true });
setImmediate(() => {
const request = { abort };
response.emit('request', request);
});
return response;
});

const extend = jest.fn((...args) => {
[{ hooks }] = args;
return { stream };
});

function getMocks() {
return { abort, extend, response, stream };
}

function mockClear() {
hooks = {};
abort.mockClear();
extend.mockClear();
stream.mockClear();
}

function getHooks() {
return hooks;
}

module.exports = { extend, getHooks, getMocks, mockClear };
15 changes: 15 additions & 0 deletions lib/__mocks__/lifion-aws-event-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const { Transform } = require('stream');

function Parser() {
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
this.push(chunk);
callback();
}
});
}

module.exports = { Parser };
142 changes: 131 additions & 11 deletions lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
/**
* Module that implements an AWS enhanced fan-out consumer.
*
* @module fan-out-consumer
* @private
*/

'use strict';

const aws4 = require('aws4');
Expand All @@ -22,33 +29,91 @@ const asyncPipeline = promisify(pipeline);
const privateData = new WeakMap();
const wait = promisify(setTimeout);

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

/**
* Class that implements a pre-processing stream used as a filter to a stream request to the
* shard subscription API. If the request is successful and the response is an event stream,
* chunks are passed to subsequent streams in the pipeline. If the server responds with an error,
* the error details are parsed and then thrown as an error, which breaks the entire pipeline.
*
* @extends external:Transform
* @memberof module:fan-out-consumer
* @private
*/
class PreProcess extends Transform {
/**
* Initializes an instance of the pre-processing stream.
*
* @param {Object} options - The initialization options.
* @param {Object} options.requestFlags - The object where the flags for the request are stored.
* @param {boolean} options.requestFlags.isEventStream - If the request is sucessful and the
* headers in the initial response point to an even stream, this flag is set to `true`.
* @param {number} options.requestFlags.statusCode - The status code of the last request response.
*/
constructor({ requestFlags }) {
super({ objectMode: true });
Object.assign(internal(this), { requestFlags });
}

/**
* The stream transformation logic.
*
* @param {Buffer} chunk - A chunk of data coming from the event stream.
* @param {string} encoding - The stream encoding mode (ignored)
* @param {Function} callback - The callback for more data.
*/
_transform(chunk, encoding, callback) {
const { requestFlags } = internal(this);
if (!requestFlags.isEventStream) {
const { statusCode } = requestFlags;
const { __type, message } = JSON.parse(chunk.toString('utf8'));
const err = new Error(message || 'Failed to subscribe to shard.');
if (__type) err.code = __type;
err.isRetryable = true;
this.emit('error', err);
const error = Object.assign(
new Error(message || 'Failed to subscribe to shard.'),
{ isRetryable: true },
__type && { code: __type },
statusCode && { statusCode }
);
this.emit('error', error);
} else {
this.push(chunk);
}
callback();
}
}

/**
* Class that implements a post-processing stream used to push records outside the internal
* stream pipeline. It also stores checkpoints as records arrive, and look for shard depletion.
*
* @extends external:Writable
* @memberof module:fan-out-consumer
* @private
*/
class PostProcess extends Writable {
/**
* Initializes an instance of the post-processing stream.
*
* @param {Object} options - The initialization options.
* @param {Function} options.abort - A function that will close the entire pipeline, called
* when no data has been pushed through the event stream on a given time window.
* @param {Object} options.logger - An instance of a logger.
* @param {Function} options.markShardAsDepleted - A function that will mark a given shard as
* depleted. Called when a shard depletion event has been detected.
* @param {Function} options.pushToStream - A function that pushes records out of the pipeline.
* @param {Function} options.setCheckpoint - A function that stores the checkpoint for the shard.
* @param {string} options.shardId - The ID of the shard.
*/
constructor({ abort, logger, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
super({ objectMode: true });
Object.assign(internal(this), {
Expand All @@ -62,6 +127,13 @@ class PostProcess extends Writable {
});
}

/**
* The stream writable logic.
*
* @param {Object} chunk - A chunk of data coming from the pipeline.
* @param {string} encoding - The stream encoding mode (ignored)
* @param {Function} callback - The callback for more data.
*/
async _write(chunk, encoding, callback) {
const {
abort,
Expand Down Expand Up @@ -90,7 +162,29 @@ class PostProcess extends Writable {
}
}

/**
* Class that implements an AWS enhanced fan-out consumer.
*
* @alias module:fan-out-consumer
*/
class FanOutConsumer {
/**
* Initializes an instance of an enhanced fan-out consumer.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The AWS.Kinesis options to use in the HTTP request.
* @param {string} options.checkpoint - The last-known checkpoint for the stream shard.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {string} options.consumerArn - The ARN of the enhanced consumer as registered in AWS.
* @param {string} options.leaseExpiration - The timestamp of the shard lease expiration.
* @param {Object} options.logger - An instance of a logger.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {string} options.shardId - The ID of the stream shard to subscribe for records.
* @param {Object} options.stateStore - An instance of the state store.
* @param {Function} options.stopConsumer - A function that stops this consumer from the manager.
* @param {string} options.streamName - The name of the Kinesis stream.
*/
constructor(options) {
const {
awsOptions,
Expand Down Expand Up @@ -145,6 +239,12 @@ class FanOutConsumer {
});
}

/**
* Starts the enhanced fan-out consumer by initializing the internal stream pipeline.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async start() {
const privateProps = internal(this);
const {
Expand All @@ -161,7 +261,7 @@ class FanOutConsumer {
streamName
} = privateProps;

logger.debug(`Starting an enhanded fan-out subscriber for shard "${shardId}"…`);
logger.debug(`Starting an enhanced fan-out subscriber for shard "${shardId}"…`);

this.updateLeaseExpiration(leaseExpiration);

Expand All @@ -176,6 +276,7 @@ class FanOutConsumer {

const handleResponse = async res => {
const { headers, statusCode } = res;
requestFlags.statusCode = statusCode;
if (headers['content-type'] !== AWS_EVENT_STREAM || statusCode !== 200) {
logger.error(`Subscription unsuccessful: ${statusCode}`);
requestFlags.isEventStream = false;
Expand Down Expand Up @@ -248,12 +349,13 @@ class FanOutConsumer {
const { code, message, requestId, statusCode } = err;
if (!shouldBailRetry(err)) {
logger.warn(
`Trying to recover from AWS.Kinesis error…\n${[
`\t- Message: ${message}`,
`\t- Request ID: ${requestId}`,
`\t- Code: ${code} (${statusCode})`,
`\t- Stream: ${streamName}`
].join('\n')}`
[
'Trying to recover from AWS.Kinesis error…',
`- Message: ${message}`,
`- Request ID: ${requestId}`,
`- Code: ${code} (${statusCode})`,
`- Stream: ${streamName}`
].join('\n\t')
);
} else {
pushToStream(err);
Expand All @@ -266,6 +368,9 @@ class FanOutConsumer {
abort();
}

/**
* Stops the internal stream pipeline.
*/
stop() {
const privateProps = internal(this);
const { expirationTimeoutId, request } = privateProps;
Expand All @@ -278,6 +383,11 @@ class FanOutConsumer {
privateProps.expirationTimeoutId = null;
}

/**
* Updates the shard lease expiration timestamp.
*
* @param {string} leaseExpiration - The updated timestamp when the shard lease expires.
*/
updateLeaseExpiration(leaseExpiration) {
const privateProps = internal(this);
const { expirationTimeoutId, logger, shardId, stopConsumer } = privateProps;
Expand All @@ -297,4 +407,14 @@ class FanOutConsumer {
}
}

/**
* @external Transform
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_transform
*/

/**
* @external Writable
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_writable
*/

module.exports = FanOutConsumer;
Loading

0 comments on commit 8af3763

Please sign in to comment.