Skip to content

Commit

Permalink
Add test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Sep 25, 2018
1 parent 40f156f commit 562bea1
Show file tree
Hide file tree
Showing 7 changed files with 571 additions and 20 deletions.
113 changes: 113 additions & 0 deletions lib/__mocks__/aws-sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict';

const mockData = {};

function resetMockData() {
mockData.Consumers = [];
mockData.Streams = [];
}

const addTagsToStream = jest.fn(() => ({ promise: () => Promise.resolve() }));

const createStream = jest.fn(params => {
const { StreamName } = params;
const Stream = {
StreamName,
StreamStatus: 'CREATING',
StreamARN: [
'arn:aws:kinesis:us-east-1',
Math.floor(Math.random() * 1e12),
`stream/${StreamName}`
].join(':')
};
mockData.Streams.push(Stream);
return { promise: () => Promise.resolve({}) };
});

const describeStream = jest.fn(params => {
const { StreamName } = params;
const StreamDescription = mockData.Streams.find(i => i.StreamName === StreamName);
if (!StreamDescription) {
const err = new Error("The stream doesn't exists.");
err.code = 'ResourceNotFoundException';
return { promise: () => Promise.reject(err) };
}
return { promise: () => Promise.resolve({ StreamDescription }) };
});

const listStreamConsumers = jest.fn(() => {
const { Consumers } = mockData;
return { promise: () => Promise.resolve({ Consumers }) };
});

const listTagsForStream = jest.fn(params => {
const { StreamName } = params;
const { Tags = [] } = mockData.Streams.find(i => i.StreamName === StreamName);
return { promise: () => Promise.resolve({ Tags }) };
});

const registerStreamConsumer = jest.fn(params => {
const { ConsumerName } = params;
const Consumer = {
ConsumerARN: [
'arn:aws:kinesis:us-east-1',
Math.floor(Math.random() * 1e12),
`stream/test/consumer/${ConsumerName.toLowerCase()}`,
Math.floor(Math.random() * 1e12)
].join(':'),
ConsumerName,
ConsumerStatus: 'ACTIVE'
};
mockData.Consumers.push(Consumer);
return {
promise: () => Promise.resolve({ Consumer: { ...Consumer, ConsumerStatus: 'CREATING' } })
};
});

const startStreamEncryption = jest.fn(() => ({ promise: () => Promise.resolve({}) }));

const waitFor = jest.fn((state, { StreamName }) => {
const StreamDescription = mockData.Streams.find(i => i.StreamName === StreamName);
return { promise: () => Promise.resolve({ StreamDescription }) };
});

const Kinesis = jest.fn(() => ({
addTagsToStream,
createStream,
describeStream,
listStreamConsumers,
listTagsForStream,
registerStreamConsumer,
startStreamEncryption,
waitFor
}));

function mockClear() {
addTagsToStream.mockClear();
createStream.mockClear();
describeStream.mockClear();
listStreamConsumers.mockClear();
listTagsForStream.mockClear();
registerStreamConsumer.mockClear();
startStreamEncryption.mockClear();
waitFor.mockClear();
Kinesis.mockClear();
resetMockData();
}

function mockConsumers() {
return mockData.Consumers;
}

function mockStreams() {
return mockData.Streams;
}

resetMockData();

module.exports = {
Kinesis,
mockClear,
mockConsumers,
mockStreams
};
20 changes: 10 additions & 10 deletions lib/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@ module.exports.activate = async ctx => {
const consumer = await describeStreamConsumer(ConsumerName, client, StreamARN);
let { ConsumerStatus, ConsumerARN } = consumer;

if (ConsumerStatus === 'ACTIVE') {
logger.debug('The stream consumer exists already and is active.');
}

if (ConsumerStatus === 'DELETING') {
logger.debug('Waiting for the stream consumer to complete deletion…');
let checks = 0;
while ((await describeStreamConsumer()).ConsumerStatus) {
await wait(CONSUMER_STATE_CHECK_DELAY);
checks += 1;
if (checks > CONSUMER_MAX_STATE_CHECKS) {
throw new Error(
`Maximum checks reached while waiting for consumer "${ConsumerName}" to get deleted.`
);
const errMsg = `Consumer "${ConsumerName}" exceeded the maximum wait time for deletion.`;
logger.error(errMsg);
throw new Error(errMsg);
}
}
logger.debug('The stream consumer is now gone.');
Expand All @@ -55,17 +59,13 @@ module.exports.activate = async ctx => {
await wait(CONSUMER_STATE_CHECK_DELAY);
checks += 1;
if (checks > CONSUMER_MAX_STATE_CHECKS) {
throw new Error(
`Maximum checks reached while waiting for consumer "${ConsumerName}" to get active.`
);
const errMsg = `Consumer "${ConsumerName}" exceeded the maximum wait time for activation.`;
logger.error(errMsg);
throw new Error(errMsg);
}
}
logger.debug('The stream consumer is now active.');
}

if (ConsumerStatus === 'ACTIVE') {
logger.debug('The stream consumer already exists.');
}

return ConsumerARN;
};
149 changes: 149 additions & 0 deletions lib/consumer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
'use strict';

const { Kinesis, mockClear, mockConsumers } = require('aws-sdk');
const consumer = require('./consumer');

describe('lib/consumer', () => {
let client;
let logger;
let ctx;

beforeEach(() => {
jest.useFakeTimers();
client = new Kinesis();
logger = { debug: jest.fn(), error: jest.fn() };
ctx = {
client,
consumerName: 'foo',
logger,
streamArn: 'bar',
streamName: 'baz'
};
});

afterEach(() => {
mockClear();
});

test('the module exports the expected', () => {
expect(consumer).toEqual({
activate: expect.any(Function)
});
});

test("activate registers a consumer and return its ARN if it doesn't exists", async () => {
await expect(consumer.activate(ctx)).resolves.toMatch(/^arn:aws:kinesis/);
expect(client.registerStreamConsumer).toBeCalledWith({
ConsumerName: 'foo',
StreamARN: 'bar'
});
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['Trying to register the consumer…'],
['Waiting until the stream consumer is active…'],
['The stream consumer is now active.']
]);
});

test("activate doesn't tries to register for an already active consumer", async () => {
const mockConsumer = {
ConsumerARN: 'qux',
ConsumerName: 'foo',
ConsumerStatus: 'ACTIVE'
};
mockConsumers().push(mockConsumer);
setTimeout.mockImplementationOnce(callback => {
mockConsumer.ConsumerStatus = 'ACTIVE';
callback();
});
await expect(consumer.activate(ctx)).resolves.toBe('qux');
expect(client.registerStreamConsumer).not.toBeCalled();
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['The stream consumer exists already and is active.']
]);
});

test("activate waits for a consumer if it's in creating state", async () => {
const mockConsumer = {
ConsumerARN: 'qux',
ConsumerName: 'foo',
ConsumerStatus: 'CREATING'
};
mockConsumers().push(mockConsumer);
setTimeout.mockImplementationOnce(callback => {
mockConsumer.ConsumerStatus = 'ACTIVE';
callback();
});
await expect(consumer.activate(ctx)).resolves.toBe('qux');
expect(client.registerStreamConsumer).not.toBeCalled();
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['Waiting until the stream consumer is active…'],
['The stream consumer is now active.']
]);
});

test('activate throws if waiting too long for a consumer that is in creating state', async () => {
const mockConsumer = {
ConsumerARN: 'qux',
ConsumerName: 'foo',
ConsumerStatus: 'CREATING'
};
mockConsumers().push(mockConsumer);
setTimeout.mockImplementation(callback => callback());
await expect(consumer.activate(ctx)).rejects.toThrow(
'Consumer "foo" exceeded the maximum wait time for activation.'
);
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['Waiting until the stream consumer is active…']
]);
expect(logger.error.mock.calls).toEqual([
['Consumer "foo" exceeded the maximum wait time for activation.']
]);
});

test("activate waits for a consumer if it's in deleting state before creating one", async () => {
const mockConsumer = {
ConsumerARN: 'qux',
ConsumerName: 'foo',
ConsumerStatus: 'DELETING'
};
mockConsumers().push(mockConsumer);
setTimeout.mockImplementationOnce(callback => {
mockConsumers().length = 0;
callback();
});
await expect(consumer.activate(ctx)).resolves.toMatch(/^arn:aws:kinesis/);
expect(client.registerStreamConsumer).toBeCalledWith({ ConsumerName: 'foo', StreamARN: 'bar' });
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['Waiting for the stream consumer to complete deletion…'],
['The stream consumer is now gone.'],
['Trying to register the consumer…'],
['Waiting until the stream consumer is active…'],
['The stream consumer is now active.']
]);
});

test('activate throws if waiting too long for a consumer that is in deleting state', async () => {
const mockConsumer = {
ConsumerARN: 'qux',
ConsumerName: 'foo',
ConsumerStatus: 'DELETING'
};
mockConsumers().push(mockConsumer);
setTimeout.mockImplementation(callback => callback());
await expect(consumer.activate(ctx)).rejects.toThrow(
'Consumer "foo" exceeded the maximum wait time for deletion.'
);
expect(logger.debug.mock.calls).toEqual([
['Checking if the "foo" consumer for "baz" exists…'],
['Waiting for the stream consumer to complete deletion…']
]);
expect(logger.error.mock.calls).toEqual([
['Consumer "foo" exceeded the maximum wait time for deletion.']
]);
});
});
10 changes: 4 additions & 6 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/* eslint-disable no-await-in-loop */

'use strict';

const { Kinesis } = require('aws-sdk');
const { Kinesis: AwsKinesis } = require('aws-sdk');
const { noop } = require('./utils');
const consumer = require('./consumer');
const stream = require('./stream');
Expand All @@ -14,7 +12,7 @@ function internal(instance) {
return privateData.get(instance);
}

class KinesisClient {
class Kinesis {
constructor(options = {}) {
const {
consumerName,
Expand Down Expand Up @@ -61,7 +59,7 @@ class KinesisClient {
const { encryption, tags, logger, options } = ctx;

logger.debug('Trying to connect the client…');
ctx.client = new Kinesis(options);
ctx.client = new AwsKinesis(options);

ctx.streamArn = await stream.activate(ctx);
if (encryption) await stream.encrypt(ctx);
Expand All @@ -73,4 +71,4 @@ class KinesisClient {
}
}

module.exports = KinesisClient;
module.exports = Kinesis;
Loading

0 comments on commit 562bea1

Please sign in to comment.