Skip to content

Commit

Permalink
Add test coverage for when stopping consumers on start failure
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Aug 23, 2019
1 parent 9c156be commit f02d3d0
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 11 deletions.
1 change: 0 additions & 1 deletion lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ class FanOutConsumer {
const isArnInUseError =
code === 'ResourceInUseException' &&
message.includes('Another active subscription exists');
console.warn({ isArnInUseError });
if (!shouldBailRetry(err) || isArnInUseError) {
logger.warn(
[
Expand Down
65 changes: 59 additions & 6 deletions lib/fan-out-consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,11 @@ describe('lib/fan-out-consumer', () => {
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(storeShardCheckpoint).not.toHaveBeenCalled();
expect(debug).toHaveBeenCalledTimes(1);
expect(error).toHaveBeenNthCalledWith(1, 'Subscription unsuccessful: 500');
expect(error).toHaveBeenNthCalledWith(
2,
1,
'Pipeline closed with error: [UnknownOperationException] Failed to subscribe to shard.'
);
expect(error).toHaveBeenCalledTimes(2);
expect(error).toHaveBeenCalledTimes(1);
expect(pushToStream).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
Expand All @@ -265,6 +264,21 @@ describe('lib/fan-out-consumer', () => {
expect(stats.reportError).toHaveBeenCalledWith('kinesis', { statusCode: 500 }, 'test-stream');
});

test("the consumer is stopped when the shards state can't be resolved on start", async () => {
const consumer = new FanOutConsumer(options);
getShardsData.mockImplementationOnce(() => {
throw new Error('foo');
});
await consumer.start();
expect(warn).toHaveBeenCalledTimes(1);
expect(warn).toHaveBeenNthCalledWith(
1,
"Can't start the consumer as the state can't be resolved:",
expect.objectContaining({ message: 'foo' })
);
expect(stopConsumer).toHaveBeenCalledWith('shard-0001');
});

test('an empty array of records in the stream is not pushed outside the pipeline', async () => {
const consumer = new FanOutConsumer(options);
const start = consumer.start();
Expand Down Expand Up @@ -415,8 +429,9 @@ describe('lib/fan-out-consumer', () => {
response.emit('error', Object.assign(new Error('foo'), { code: 'UnknownOperationException' }));
await start;
expect(stream).toHaveBeenCalledTimes(2);
expect(warn).toHaveBeenNthCalledWith(1, 'Subscription unsuccessful: 500');
expect(warn).toHaveBeenNthCalledWith(
1,
2,
[
'Trying to recover from AWS.Kinesis error…',
'- Message: Unexpected Server Error',
Expand All @@ -425,7 +440,45 @@ describe('lib/fan-out-consumer', () => {
'- Stream: test-stream'
].join('\n\t')
);
expect(warn).toHaveBeenNthCalledWith(2, 'Waiting before retrying the pipeline…');
expect(warn).toHaveBeenCalledTimes(2);
expect(warn).toHaveBeenNthCalledWith(3, 'Waiting before retrying the pipeline…');
expect(warn).toHaveBeenCalledTimes(3);
});

test('the consumer will recreate the pipeline on ARN is in use errors', async () => {
const consumer = new FanOutConsumer({ ...options, checkpoint: '1' });
const start = consumer.start();
await nextTickWait();
let { response } = got.getMocks();
response.emit('response', {
headers: { 'content-type': 'application/json' },
statusCode: 500
});
response.push(
JSON.stringify({
__type: 'ResourceInUseException',
message: 'Another active subscription exists for consumer "foo"'
})
);
await nextTickWait();
await nextTickWait();
const gotMocks = got.getMocks();
({ response } = gotMocks);
const { stream } = gotMocks;
response.emit('error', Object.assign(new Error('foo'), { code: 'UnknownOperationException' }));
await start;
expect(stream).toHaveBeenCalledTimes(2);
expect(warn).toHaveBeenNthCalledWith(1, 'Subscription unsuccessful: 500');
expect(warn).toHaveBeenNthCalledWith(
2,
[
'Trying to recover from AWS.Kinesis error…',
'- Message: Another active subscription exists for consumer "foo"',
'- Request ID: undefined',
'- Code: ResourceInUseException (500)',
'- Stream: test-stream'
].join('\n\t')
);
expect(warn).toHaveBeenNthCalledWith(3, 'Waiting before retrying the pipeline…');
expect(warn).toHaveBeenCalledTimes(3);
});
});
13 changes: 13 additions & 0 deletions lib/polling-consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ describe('lib/polling-consumer', () => {
await expect(consumer.start()).resolves.toBeUndefined();
});

test('the consumer stops when failing to resolve the shard state on start', async () => {
getShardsData.mockImplementationOnce(() => {
throw new Error('foo');
});
const consumer = new PollingConsumer(options);
await consumer.start();
expect(warn).toHaveBeenCalledWith(
"Can't start the consumer as the state can't be resolved:",
expect.objectContaining({ message: 'foo' })
);
expect(stopConsumer).toHaveBeenCalledWith('shardId-0000');
});

test('an iterator for the latest records replaces an invalid checkpoint', done => {
const error = Object.assign(new Error('foo'), { code: 'InvalidArgumentException' });
getShardIterator.mockRejectedValueOnce(error);
Expand Down
3 changes: 2 additions & 1 deletion lib/state-store.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ describe('lib/state-store', () => {
},
enhancedConsumers: {
'enhanced-consumer-1': { isUsedBy: 'consumer-2' },
'enhanced-consumer-2': { isUsedBy: 'consumer-3' }
'enhanced-consumer-2': { isUsedBy: 'consumer-3' },
'enhanced-consumer-3': { isUsedBy: null }
},
version: '0000'
}
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
},
"dependencies": {
"async-retry": "^1.2.3",
"aws-sdk": "^2.514.0",
"aws-sdk": "^2.516.0",
"aws4": "^1.8.0",
"fast-deep-equal": "^2.0.1",
"got": "^9.6.0",
Expand Down Expand Up @@ -91,7 +91,7 @@
},
"@lifion/core-commons": {
"template": "public",
"updated": "2019-08-20T20:54:06.146Z",
"version": "2.2.0"
"updated": "2019-08-23T18:18:36.111Z",
"version": "2.3.1"
}
}

0 comments on commit f02d3d0

Please sign in to comment.