diff --git a/lib/__mocks__/aws-sdk.js b/lib/__mocks__/aws-sdk.js index 5263a96f..3245171d 100644 --- a/lib/__mocks__/aws-sdk.js +++ b/lib/__mocks__/aws-sdk.js @@ -6,6 +6,16 @@ function createResponseMock(data = {}) { }); } +const resolvePromise = jest.fn(() => + Promise.resolve({ + accessKeyId: 'resolved-access-key-id', + secretAccessKey: 'resolved-secret-access-key', + sessionToken: 'resolved-session-token' + }) +); + +const CredentialProviderChain = jest.fn(() => ({ resolvePromise })); + const createTable = createResponseMock(); const deleteMock = createResponseMock(); const describeTable = createResponseMock(); @@ -68,6 +78,9 @@ const Kinesis = jest.fn(({ endpoint } = {}) => { }); function mockClear() { + resolvePromise.mockClear(); + CredentialProviderChain.mockClear(); + createTable.mockClear(); deleteMock.mockClear(); describeTable.mockClear(); @@ -98,6 +111,7 @@ function mockClear() { } module.exports = { + CredentialProviderChain, DynamoDB, Kinesis, mockClear diff --git a/lib/__mocks__/got.js b/lib/__mocks__/got.js new file mode 100644 index 00000000..2192719b --- /dev/null +++ b/lib/__mocks__/got.js @@ -0,0 +1,39 @@ +'use strict'; + +const { Transform } = require('stream'); + +let hooks; +let response; + +const abort = jest.fn(); + +const stream = jest.fn(() => { + response = new Transform({ objectMode: true }); + setImmediate(() => { + const request = { abort }; + response.emit('request', request); + }); + return response; +}); + +const extend = jest.fn((...args) => { + [{ hooks }] = args; + return { stream }; +}); + +function getMocks() { + return { abort, extend, response, stream }; +} + +function mockClear() { + hooks = {}; + abort.mockClear(); + extend.mockClear(); + stream.mockClear(); +} + +function getHooks() { + return hooks; +} + +module.exports = { extend, getHooks, getMocks, mockClear }; diff --git a/lib/__mocks__/lifion-aws-event-stream.js b/lib/__mocks__/lifion-aws-event-stream.js new file mode 100644 index 00000000..b1f1172b --- /dev/null +++ b/lib/__mocks__/lifion-aws-event-stream.js @@ -0,0 +1,15 @@ +'use strict'; + +const { Transform } = require('stream'); + +function Parser() { + return new Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + this.push(chunk); + callback(); + } + }); +} + +module.exports = { Parser }; diff --git a/lib/fan-out-consumer.js b/lib/fan-out-consumer.js index 381ec4a8..513bb52e 100644 --- a/lib/fan-out-consumer.js +++ b/lib/fan-out-consumer.js @@ -1,3 +1,10 @@ +/** + * Module that implements an AWS enhanced fan-out consumer. + * + * @module fan-out-consumer + * @private + */ + 'use strict'; const aws4 = require('aws4'); @@ -22,25 +29,62 @@ const asyncPipeline = promisify(pipeline); const privateData = new WeakMap(); const wait = promisify(setTimeout); +/** + * Provides access to the private data of the specified instance. + * + * @param {Object} instance - The private data's owner. + * @returns {Object} The private data. + * @private + */ function internal(instance) { if (!privateData.has(instance)) privateData.set(instance, {}); return privateData.get(instance); } +/** + * Class that implements a pre-processing stream used as a filter to a stream request to the + * shard subscription API. If the request is successful and the response is an event stream, + * chunks are passed to subsequent streams in the pipeline. If the server responds with an error, + * the error details are parsed and then thrown as an error, which breaks the entire pipeline. + * + * @extends external:Transform + * @memberof module:fan-out-consumer + * @private + */ class PreProcess extends Transform { + /** + * Initializes an instance of the pre-processing stream. + * + * @param {Object} options - The initialization options. + * @param {Object} options.requestFlags - The object where the flags for the request are stored. + * @param {boolean} options.requestFlags.isEventStream - If the request is sucessful and the + * headers in the initial response point to an even stream, this flag is set to `true`. + * @param {number} options.requestFlags.statusCode - The status code of the last request response. + */ constructor({ requestFlags }) { super({ objectMode: true }); Object.assign(internal(this), { requestFlags }); } + /** + * The stream transformation logic. + * + * @param {Buffer} chunk - A chunk of data coming from the event stream. + * @param {string} encoding - The stream encoding mode (ignored) + * @param {Function} callback - The callback for more data. + */ _transform(chunk, encoding, callback) { const { requestFlags } = internal(this); if (!requestFlags.isEventStream) { + const { statusCode } = requestFlags; const { __type, message } = JSON.parse(chunk.toString('utf8')); - const err = new Error(message || 'Failed to subscribe to shard.'); - if (__type) err.code = __type; - err.isRetryable = true; - this.emit('error', err); + const error = Object.assign( + new Error(message || 'Failed to subscribe to shard.'), + { isRetryable: true }, + __type && { code: __type }, + statusCode && { statusCode } + ); + this.emit('error', error); } else { this.push(chunk); } @@ -48,7 +92,28 @@ class PreProcess extends Transform { } } +/** + * Class that implements a post-processing stream used to push records outside the internal + * stream pipeline. It also stores checkpoints as records arrive, and look for shard depletion. + * + * @extends external:Writable + * @memberof module:fan-out-consumer + * @private + */ class PostProcess extends Writable { + /** + * Initializes an instance of the post-processing stream. + * + * @param {Object} options - The initialization options. + * @param {Function} options.abort - A function that will close the entire pipeline, called + * when no data has been pushed through the event stream on a given time window. + * @param {Object} options.logger - An instance of a logger. + * @param {Function} options.markShardAsDepleted - A function that will mark a given shard as + * depleted. Called when a shard depletion event has been detected. + * @param {Function} options.pushToStream - A function that pushes records out of the pipeline. + * @param {Function} options.setCheckpoint - A function that stores the checkpoint for the shard. + * @param {string} options.shardId - The ID of the shard. + */ constructor({ abort, logger, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) { super({ objectMode: true }); Object.assign(internal(this), { @@ -62,6 +127,13 @@ class PostProcess extends Writable { }); } + /** + * The stream writable logic. + * + * @param {Object} chunk - A chunk of data coming from the pipeline. + * @param {string} encoding - The stream encoding mode (ignored) + * @param {Function} callback - The callback for more data. + */ async _write(chunk, encoding, callback) { const { abort, @@ -90,7 +162,29 @@ class PostProcess extends Writable { } } +/** + * Class that implements an AWS enhanced fan-out consumer. + * + * @alias module:fan-out-consumer + */ class FanOutConsumer { + /** + * Initializes an instance of an enhanced fan-out consumer. + * + * @param {Object} options - The initialization options. + * @param {Object} options.awsOptions - The AWS.Kinesis options to use in the HTTP request. + * @param {string} options.checkpoint - The last-known checkpoint for the stream shard. + * @param {Object} options.client - An instance of the Kinesis client. + * @param {string} options.compression - The kind of data compression to use with records. + * @param {string} options.consumerArn - The ARN of the enhanced consumer as registered in AWS. + * @param {string} options.leaseExpiration - The timestamp of the shard lease expiration. + * @param {Object} options.logger - An instance of a logger. + * @param {Function} options.pushToStream - A function to push incoming records to the consumer. + * @param {string} options.shardId - The ID of the stream shard to subscribe for records. + * @param {Object} options.stateStore - An instance of the state store. + * @param {Function} options.stopConsumer - A function that stops this consumer from the manager. + * @param {string} options.streamName - The name of the Kinesis stream. + */ constructor(options) { const { awsOptions, @@ -145,6 +239,12 @@ class FanOutConsumer { }); } + /** + * Starts the enhanced fan-out consumer by initializing the internal stream pipeline. + * + * @fulfil {undefined} + * @returns {Promise} + */ async start() { const privateProps = internal(this); const { @@ -161,7 +261,7 @@ class FanOutConsumer { streamName } = privateProps; - logger.debug(`Starting an enhanded fan-out subscriber for shard "${shardId}"…`); + logger.debug(`Starting an enhanced fan-out subscriber for shard "${shardId}"…`); this.updateLeaseExpiration(leaseExpiration); @@ -176,6 +276,7 @@ class FanOutConsumer { const handleResponse = async res => { const { headers, statusCode } = res; + requestFlags.statusCode = statusCode; if (headers['content-type'] !== AWS_EVENT_STREAM || statusCode !== 200) { logger.error(`Subscription unsuccessful: ${statusCode}`); requestFlags.isEventStream = false; @@ -248,12 +349,13 @@ class FanOutConsumer { const { code, message, requestId, statusCode } = err; if (!shouldBailRetry(err)) { logger.warn( - `Trying to recover from AWS.Kinesis error…\n${[ - `\t- Message: ${message}`, - `\t- Request ID: ${requestId}`, - `\t- Code: ${code} (${statusCode})`, - `\t- Stream: ${streamName}` - ].join('\n')}` + [ + 'Trying to recover from AWS.Kinesis error…', + `- Message: ${message}`, + `- Request ID: ${requestId}`, + `- Code: ${code} (${statusCode})`, + `- Stream: ${streamName}` + ].join('\n\t') ); } else { pushToStream(err); @@ -266,6 +368,9 @@ class FanOutConsumer { abort(); } + /** + * Stops the internal stream pipeline. + */ stop() { const privateProps = internal(this); const { expirationTimeoutId, request } = privateProps; @@ -278,6 +383,11 @@ class FanOutConsumer { privateProps.expirationTimeoutId = null; } + /** + * Updates the shard lease expiration timestamp. + * + * @param {string} leaseExpiration - The updated timestamp when the shard lease expires. + */ updateLeaseExpiration(leaseExpiration) { const privateProps = internal(this); const { expirationTimeoutId, logger, shardId, stopConsumer } = privateProps; @@ -297,4 +407,14 @@ class FanOutConsumer { } } +/** + * @external Transform + * @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_transform + */ + +/** + * @external Writable + * @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_writable + */ + module.exports = FanOutConsumer; diff --git a/lib/fan-out-consumer.test.js b/lib/fan-out-consumer.test.js index fbbecefb..20a7d82b 100644 --- a/lib/fan-out-consumer.test.js +++ b/lib/fan-out-consumer.test.js @@ -1,10 +1,429 @@ 'use strict'; +const aws = require('aws-sdk'); +const got = require('got'); +const mockStream = require('stream'); + const FanOutConsumer = require('./fan-out-consumer'); +const records = require('./records'); +const stats = require('./stats'); + +jest.mock('util', () => { + const { promisify, ...otherUtils } = jest.requireActual('util'); + return { + ...otherUtils, + promisify: (...args) => { + const [func] = args; + if (func.name === 'setTimeout') { + return () => new Promise(resolve => setImmediate(resolve)); + } + return promisify(...args); + } + }; +}); + +jest.mock('./records', () => { + const RecordsDecoder = jest.fn( + () => + new mockStream.Transform({ + objectMode: true, + transform(chunk, encoding, callback) { + this.push(chunk); + callback(); + } + }) + ); + return { RecordsDecoder }; +}); + +jest.mock('./stream', () => ({ + getStreamShards: () => ({}) +})); + +jest.mock('./stats'); + +jest.useFakeTimers(); describe('lib/fan-out-consumer', () => { + const debug = jest.fn(); + const error = jest.fn(); + const warn = jest.fn(); + const logger = { debug, error, warn }; + + const getShardsData = jest.fn(() => + Promise.resolve({ shardsPath: '#a', shardsPathNames: { '#a': 'a' } }) + ); + const markShardAsDepleted = jest.fn(); + const storeShardCheckpoint = jest.fn(); + const stateStore = { getShardsData, markShardAsDepleted, storeShardCheckpoint }; + + const pushToStream = jest.fn(); + const stopConsumer = jest.fn(); + const options = { + awsOptions: { region: 'us-east-1' }, + checkpoint: null, + client: {}, + compression: 'LZ-UTF8', + consumerArn: 'arn:enhanced-consumer', + leaseExpiration: new Date(Date.now() + 5 * 60 * 1000).toISOString(), + logger, + pushToStream, + shardId: 'shard-0001', + stateStore, + stopConsumer, + streamName: 'test-stream' + }; + + function nextTickWait() { + return new Promise(resolve => setImmediate(resolve)); + } + + afterEach(() => { + aws.mockClear(); + debug.mockClear(); + error.mockClear(); + getShardsData.mockClear(); + got.mockClear(); + markShardAsDepleted.mockClear(); + pushToStream.mockClear(); + stopConsumer.mockClear(); + storeShardCheckpoint.mockClear(); + warn.mockClear(); + stats.reportError.mockClear(); + stats.reportResponse.mockClear(); + records.RecordsDecoder.mockClear(); + clearTimeout.mockClear(); + setTimeout.mockClear(); + jest.clearAllTimers(); + }); + test('the module exports the expected', () => { expect(FanOutConsumer).toEqual(expect.any(Function)); expect(FanOutConsumer).toThrow('Class constructor'); }); + + test('the constructor adds a hook to sign requests with resolved credentials', async () => { + const consumer = new FanOutConsumer(options); + expect(consumer).toBeInstanceOf(FanOutConsumer); + const { extend } = got.getMocks(); + expect(extend).toHaveBeenCalledWith({ + baseUrl: 'https://kinesis.us-east-1.amazonaws.com', + headers: { 'Content-Type': 'application/x-amz-json-1.1' }, + hooks: { beforeRequest: [expect.any(Function)] }, + region: 'us-east-1', + throwHttpErrors: false + }); + const { beforeRequest } = got.getHooks(); + const [signRequest] = beforeRequest; + expect(signRequest).toBeInstanceOf(Function); + const requestOptions = {}; + await signRequest(requestOptions); + expect(requestOptions).toEqual({ + headers: { + Authorization: expect.stringMatching(/^AWS4-HMAC-SHA256 Credential=resolved-access-key-id/), + Host: '.us-east-1.amazonaws.com', + 'X-Amz-Date': expect.stringMatching(/^\d{8}T\d{6}Z$/), + 'X-Amz-Security-Token': 'resolved-session-token' + }, + hostname: '.us-east-1.amazonaws.com', + path: '/' + }); + }); + + test('the constructor adds a hook to sign requests with the passed credentials', async () => { + const consumer = new FanOutConsumer({ + awsOptions: { accessKeyId: 'foo', secretAccessKey: 'bar', sessionToken: 'baz' } + }); + expect(consumer).toBeInstanceOf(FanOutConsumer); + const { beforeRequest } = got.getHooks(); + const [signRequest] = beforeRequest; + expect(signRequest).toBeInstanceOf(Function); + const requestOptions = {}; + await signRequest(requestOptions); + expect(requestOptions).toEqual({ + headers: { + Authorization: expect.stringMatching(/^AWS4-HMAC-SHA256 Credential=foo/), + Host: '.us-east-1.amazonaws.com', + 'X-Amz-Date': expect.stringMatching(/^\d{8}T\d{6}Z$/), + 'X-Amz-Security-Token': 'baz' + }, + hostname: '.us-east-1.amazonaws.com', + path: '/' + }); + }); + + test('starting the consumer creates a streaming pipeline that pushes records', async () => { + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + const { response, stream } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ + continuationSequenceNumber: '2', + millisBehindLatest: 0, + records: [{ foo: 'bar' }] + }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(stream).toHaveBeenCalledWith('/', { + body: JSON.stringify({ + ConsumerARN: 'arn:enhanced-consumer', + ShardId: 'shard-0001', + StartingPosition: { Type: 'LATEST' } + }), + headers: { 'X-Amz-Target': 'Kinesis_20131202.SubscribeToShard' }, + service: 'kinesis' + }); + expect(records.RecordsDecoder).toHaveBeenCalledWith({ compression: 'LZ-UTF8' }); + expect(clearTimeout).toHaveBeenNthCalledWith(1, null); + expect(clearTimeout).toHaveBeenNthCalledWith(2, null); + expect(clearTimeout).toHaveBeenCalledTimes(2); + expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), expect.any(Number)); + expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), 10000); + expect(setTimeout).toHaveBeenCalledTimes(2); + expect(storeShardCheckpoint).toHaveBeenCalledWith('shard-0001', '2', '#a', { '#a': 'a' }); + expect(debug).toHaveBeenNthCalledWith( + 1, + 'Starting an enhanced fan-out subscriber for shard "shard-0001"…' + ); + expect(debug).toHaveBeenNthCalledWith(2, 'Subscription to shard is successful.'); + expect(debug).toHaveBeenNthCalledWith(3, 'Got 1 record(s) from "shard-0001" (0ms behind)'); + expect(debug).toHaveBeenCalledTimes(3); + expect(pushToStream).toHaveBeenNthCalledWith(1, null, { + continuationSequenceNumber: '2', + millisBehindLatest: 0, + records: [{ foo: 'bar' }], + shardId: 'shard-0001' + }); + expect(pushToStream).toHaveBeenCalledTimes(2); + expect(stats.reportResponse).toHaveBeenCalledWith('kinesis', 'test-stream'); + expect(stats.reportError).not.toHaveBeenCalled(); + }); + + test('the shard checkpoint is used as the starting point if available', async () => { + const consumer = new FanOutConsumer({ ...options, checkpoint: '1' }); + const start = consumer.start(); + await nextTickWait(); + const { response, stream } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ + continuationSequenceNumber: '2', + millisBehindLatest: 0, + records: [{ foo: 'bar' }] + }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(stream).toHaveBeenCalledWith('/', { + body: JSON.stringify({ + ConsumerARN: 'arn:enhanced-consumer', + ShardId: 'shard-0001', + StartingPosition: Object.assign({ Type: 'AFTER_SEQUENCE_NUMBER' }, { SequenceNumber: '1' }) + }), + headers: { 'X-Amz-Target': 'Kinesis_20131202.SubscribeToShard' }, + service: 'kinesis' + }); + }); + + test('non event stream responses are reported as errors', async () => { + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + const { response } = got.getMocks(); + response.emit('response', { headers: { 'content-type': 'application/json' }, statusCode: 500 }); + response.push(JSON.stringify({ __type: 'UnknownOperationException' })); + await start; + expect(clearTimeout).toHaveBeenNthCalledWith(1, null); + expect(clearTimeout).toHaveBeenCalledTimes(1); + expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), expect.any(Number)); + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(storeShardCheckpoint).not.toHaveBeenCalled(); + expect(debug).toHaveBeenCalledTimes(1); + expect(error).toHaveBeenNthCalledWith(1, 'Subscription unsuccessful: 500'); + expect(error).toHaveBeenNthCalledWith( + 2, + 'Pipeline closed with error: [UnknownOperationException] Failed to subscribe to shard.' + ); + expect(error).toHaveBeenCalledTimes(2); + expect(pushToStream).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + code: 'UnknownOperationException', + isRetryable: true, + message: 'Failed to subscribe to shard.' + }) + ); + expect(pushToStream).toHaveBeenCalledTimes(1); + expect(stats.reportResponse).not.toHaveBeenCalled(); + expect(stats.reportError).toHaveBeenCalledWith('kinesis', { statusCode: 500 }, 'test-stream'); + }); + + test('an empty array of records in the stream is not pushed outside the pipeline', async () => { + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + const { response } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ + continuationSequenceNumber: '2', + millisBehindLatest: 0, + records: [] + }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(storeShardCheckpoint).toHaveBeenCalledWith('shard-0001', '2', '#a', { '#a': 'a' }); + expect(debug).toHaveBeenNthCalledWith(2, 'Subscription to shard is successful.'); + expect(debug).toHaveBeenCalledTimes(2); + expect(pushToStream).toHaveBeenCalledTimes(1); + expect(stats.reportResponse).toHaveBeenCalledWith('kinesis', 'test-stream'); + expect(stats.reportError).not.toHaveBeenCalled(); + }); + + test("a shard is marked as depleted if there's no continuation sequence number", async () => { + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + const { response } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ millisBehindLatest: 0, records: [] }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(markShardAsDepleted).toHaveBeenCalledWith({}, 'shard-0001'); + expect(storeShardCheckpoint).not.toHaveBeenCalled(); + expect(debug).toHaveBeenNthCalledWith(3, 'The parent shard "shard-0001" has been depleted.'); + expect(debug).toHaveBeenCalledTimes(3); + }); + + test('a stream should be recreated on a timeout to received event data', async () => { + setTimeout.mockImplementationOnce(() => {}); + setTimeout.mockImplementationOnce((func, delay, ...args) => { + expect(delay).toBe(10000); + func(...args); + }); + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + const { abort, response } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ + millisBehindLatest: 0, + records: [] + }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(abort).toHaveBeenCalled(); + }); + + test("stopping a consumer shouldn't conflict with timing a stream pipeline out", async () => { + setTimeout.mockImplementationOnce(() => {}); + setTimeout.mockImplementationOnce((func, delay, ...args) => { + expect(delay).toBe(10000); + func(...args); + }); + const consumer = new FanOutConsumer(options); + const start = consumer.start(); + await nextTickWait(); + consumer.stop(); + const { abort, response } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/vnd.amazon.eventstream' }, + statusCode: 200 + }); + response.push({ + millisBehindLatest: 0, + records: [] + }); + await nextTickWait(); + response.emit('error', Object.assign(new Error('foo'), { code: 'ValidationException' })); + await start; + expect(abort).toHaveBeenCalled(); + }); + + test('a consumer can be stopped twice', async () => { + const consumer = new FanOutConsumer(options); + consumer.start(); + consumer.stop(); + consumer.stop(); + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(clearTimeout).toHaveBeenCalledTimes(3); + }); + + test("updating the lease to an expired timestamp doesn't schedule timeouts", async () => { + const consumer = new FanOutConsumer(options); + consumer.updateLeaseExpiration(0); + expect(setTimeout).toHaveBeenCalledTimes(0); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + test('updating the lease to a future timestamp schedules a timeout', async () => { + const consumer = new FanOutConsumer(options); + consumer.updateLeaseExpiration(new Date(Date.now() + 5 * 60 * 1000)); + expect(setTimeout).toHaveBeenCalledTimes(1); + expect(clearTimeout).toHaveBeenCalledTimes(1); + }); + + test('the consumer should be stopped once the lease expires', async () => { + setTimeout.mockImplementationOnce((func, delay, ...args) => func(...args)); + const consumer = new FanOutConsumer(options); + consumer.updateLeaseExpiration(new Date(Date.now() + 5 * 60 * 1000)); + expect(stopConsumer).toHaveBeenCalledWith('shard-0001'); + expect(debug).toHaveBeenNthCalledWith(1, 'The lease for "shard-0001" has expired.'); + expect(debug).toHaveBeenCalledTimes(1); + }); + + test('the consumer will recreate the pipeline on retryable errors', async () => { + const consumer = new FanOutConsumer({ ...options, checkpoint: '1' }); + const start = consumer.start(); + await nextTickWait(); + let { response } = got.getMocks(); + response.emit('response', { + headers: { 'content-type': 'application/json' }, + statusCode: 500 + }); + response.push( + JSON.stringify({ + __type: 'InternalServerError', + message: 'Unexpected Server Error' + }) + ); + await nextTickWait(); + await nextTickWait(); + const gotMocks = got.getMocks(); + ({ response } = gotMocks); + const { stream } = gotMocks; + response.emit('error', Object.assign(new Error('foo'), { code: 'UnknownOperationException' })); + await start; + expect(stream).toHaveBeenCalledTimes(2); + expect(warn).toHaveBeenNthCalledWith( + 1, + [ + 'Trying to recover from AWS.Kinesis error…', + '- Message: Unexpected Server Error', + '- Request ID: undefined', + '- Code: InternalServerError (500)', + '- Stream: test-stream' + ].join('\n\t') + ); + expect(warn).toHaveBeenNthCalledWith(2, 'Waiting before retrying the pipeline…'); + expect(warn).toHaveBeenCalledTimes(2); + }); });