Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Fan-Out Support #1

Merged
merged 24 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4faac4d
Ensure streams are created, encrypted, and tagged
eaviles Sep 20, 2018
3ce84f9
Add management of stream consumers, refactor stream management
eaviles Sep 21, 2018
9894deb
Add basic CircleCI integration
eaviles Sep 24, 2018
743f512
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Sep 24, 2018
23ac7b7
Correct the package-lock.json file
eaviles Sep 24, 2018
4ecd99f
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Sep 24, 2018
4f9fad8
Add test coverage for lib/utils
eaviles Sep 24, 2018
40f156f
Remove the check-dependencies script in favor of Renovate
eaviles Sep 24, 2018
562bea1
Add test coverage
eaviles Sep 25, 2018
39f098d
Make the tag test cases more specific
eaviles Sep 25, 2018
a9765f1
Upgrade aws-sdk, eslint-config-lifion, and eslint
eaviles Sep 25, 2018
e5588a6
Add enhanced-fanout shard subscriber and initial parser
eaviles Sep 27, 2018
e460ec9
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Sep 27, 2018
96c3ddb
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Oct 5, 2018
972c7f6
Adopt lifion-aws-event-stream
eaviles Oct 5, 2018
a641c0d
Make sure the CI stops if the tests fail
eaviles Oct 5, 2018
c695f98
Add a Kinesis records decoder
eaviles Oct 10, 2018
d30f709
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Oct 10, 2018
cd5d904
Connect the parsed records into the client
eaviles Oct 11, 2018
1bb389b
Include the shard ID in the piped records
eaviles Oct 12, 2018
0dd328a
Recreate the shard subscription stream when expiring
eaviles Oct 12, 2018
bdcf22f
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Oct 12, 2018
672687f
Merge branch 'develop' into feature/enhanced-fan-out
eaviles Nov 20, 2018
b5b9ce1
Set NVM to use the current LTS
eaviles Nov 20, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- restore_cache:
keys:
- dependencies-{{ checksum "package-lock.json" }}
- run: npm install
- run: npm ci
- save_cache:
key: dependencies-{{ checksum "package-lock.json" }}
paths:
Expand All @@ -33,7 +33,7 @@ jobs:
- restore_cache:
keys:
- dependencies-{{ checksum "package-lock.json" }}
- run: npm install
- run: npm ci
- save_cache:
key: dependencies-{{ checksum "package-lock.json" }}
paths:
Expand Down
9 changes: 2 additions & 7 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
*.test.js
.config
/.eslintcache
/.eslintignore
/.nvmrc
/CODE_OF_CONDUCT.md
/CONTRIBUTING.md
/.*
/docs
/reports
/scripts
/templates
__mocks__
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
stable
lts/*
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Changelog 🚀

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog).

### 0.0.0 (2018-09-25)

- [`#5`](https://github.com/lifion/lifion-kinesis/pull/5): Update dependency npm-watch to ^0.4.0
- [`#2`](https://github.com/lifion/lifion-kinesis/pull/2): Configure Renovate
- [`39f098d`](https://github.com/lifion/lifion-kinesis/commit/39f098de613494f6eea0c5d055b5025451e73d49): Make the tag test cases more specific
- [`562bea1`](https://github.com/lifion/lifion-kinesis/commit/562bea1e0b9c31baacbe1c013e81b3dbde692cae): Add test coverage
- [`40f156f`](https://github.com/lifion/lifion-kinesis/commit/40f156f259460b5801c5ecf0d221fc4b5c510138): Remove the check-dependencies script in favor of Renovate
- [`4f9fad8`](https://github.com/lifion/lifion-kinesis/commit/4f9fad8343a21803185627378b5aedd7cb93598d): Add test coverage for lib/utils
- [`07fd3aa`](https://github.com/lifion/lifion-kinesis/commit/07fd3aa183fd4f593d8fb73b773e9e618adb6740): Delete renovate.json
- [`ce26fc3`](https://github.com/lifion/lifion-kinesis/commit/ce26fc3a54fbce7e507fbc5b5a047ab0fa5ae11e): Change the Renovate configuration
- [`c8d578e`](https://github.com/lifion/lifion-kinesis/commit/c8d578efbd09d2434e4d4f480167818cc3945423): Add renovate.json
- [`23ac7b7`](https://github.com/lifion/lifion-kinesis/commit/23ac7b709c44d5e9a5c6f45ce469447fe62b8daf): Correct the package-lock.json file
- [`5823729`](https://github.com/lifion/lifion-kinesis/commit/582372932105d96124226726193a7fb22068de27): Add integration with CircleCI
- [`9894deb`](https://github.com/lifion/lifion-kinesis/commit/9894debffbae98b885c6bdb2f8cb73074e8edbc0): Add basic CircleCI integration
- [`3ce84f9`](https://github.com/lifion/lifion-kinesis/commit/3ce84f9258a722e6ed5feb7a5990703615fdadfa): Add management of stream consumers, refactor stream management
- [`4faac4d`](https://github.com/lifion/lifion-kinesis/commit/4faac4db09d8e7e48f0ffe3456550745371f16a3): Ensure streams are created, encrypted, and tagged
- [`0ffdc6a`](https://github.com/lifion/lifion-kinesis/commit/0ffdc6a628e02c453b0eaf2d049ae4b12371599d): Rename project to lifion-kinesis
- [`1be561c`](https://github.com/lifion/lifion-kinesis/commit/1be561ca8ed67539c6bae1a3d20310639d664be4): Initial commit
113 changes: 113 additions & 0 deletions lib/__mocks__/aws-sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
'use strict';

const mockData = {};

function resetMockData() {
mockData.Consumers = [];
mockData.Streams = [];
}

const addTagsToStream = jest.fn(() => ({ promise: () => Promise.resolve() }));

const createStream = jest.fn(params => {
const { StreamName } = params;
const Stream = {
StreamName,
StreamStatus: 'CREATING',
StreamARN: [
'arn:aws:kinesis:us-east-1',
Math.floor(Math.random() * 1e12),
`stream/${StreamName}`
].join(':')
};
mockData.Streams.push(Stream);
return { promise: () => Promise.resolve({}) };
});

const describeStream = jest.fn(params => {
const { StreamName } = params;
const StreamDescription = mockData.Streams.find(i => i.StreamName === StreamName);
if (!StreamDescription) {
const err = new Error("The stream doesn't exists.");
err.code = 'ResourceNotFoundException';
return { promise: () => Promise.reject(err) };
}
return { promise: () => Promise.resolve({ StreamDescription }) };
});

const listStreamConsumers = jest.fn(() => {
const { Consumers } = mockData;
return { promise: () => Promise.resolve({ Consumers }) };
});

const listTagsForStream = jest.fn(params => {
const { StreamName } = params;
const { Tags = [] } = mockData.Streams.find(i => i.StreamName === StreamName);
return { promise: () => Promise.resolve({ Tags }) };
});

const registerStreamConsumer = jest.fn(params => {
const { ConsumerName } = params;
const Consumer = {
ConsumerARN: [
'arn:aws:kinesis:us-east-1',
Math.floor(Math.random() * 1e12),
`stream/test/consumer/${ConsumerName.toLowerCase()}`,
Math.floor(Math.random() * 1e12)
].join(':'),
ConsumerName,
ConsumerStatus: 'ACTIVE'
};
mockData.Consumers.push(Consumer);
return {
promise: () => Promise.resolve({ Consumer: { ...Consumer, ConsumerStatus: 'CREATING' } })
};
});

const startStreamEncryption = jest.fn(() => ({ promise: () => Promise.resolve({}) }));

const waitFor = jest.fn((state, { StreamName }) => {
const StreamDescription = mockData.Streams.find(i => i.StreamName === StreamName);
return { promise: () => Promise.resolve({ StreamDescription }) };
});

const Kinesis = jest.fn(() => ({
addTagsToStream,
createStream,
describeStream,
listStreamConsumers,
listTagsForStream,
registerStreamConsumer,
startStreamEncryption,
waitFor
}));

function mockClear() {
addTagsToStream.mockClear();
createStream.mockClear();
describeStream.mockClear();
listStreamConsumers.mockClear();
listTagsForStream.mockClear();
registerStreamConsumer.mockClear();
startStreamEncryption.mockClear();
waitFor.mockClear();
Kinesis.mockClear();
resetMockData();
}

function mockConsumers() {
return mockData.Consumers;
}

function mockStreams() {
return mockData.Streams;
}

resetMockData();

module.exports = {
Kinesis,
mockClear,
mockConsumers,
mockStreams
};
15 changes: 15 additions & 0 deletions lib/compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const { decompressAsync } = require('lzutf8');

module.exports = {
'LZ-UTF8': {
decompress: input =>
new Promise((resolve, reject) => {
decompressAsync(input, { inputEncoding: 'Base64', useWebWorker: false }, (output, err) => {
if (!err) resolve(output);
else reject(err);
});
})
}
};
71 changes: 71 additions & 0 deletions lib/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* eslint-disable no-await-in-loop */

'use strict';

const { wait } = require('./utils');

const CONSUMER_MAX_STATE_CHECKS = 18;
const CONSUMER_STATE_CHECK_DELAY = 10000;

module.exports.activate = async ctx => {
const {
client,
consumerName: ConsumerName,
logger,
streamArn: StreamARN,
streamName: StreamName
} = ctx;

logger.debug(`Checking if the "${ConsumerName}" consumer for "${StreamName}" exists…`);

async function describeStreamConsumer() {
const { Consumers } = await client.listStreamConsumers({ StreamARN }).promise();
return Consumers.find(i => i.ConsumerName === ConsumerName) || {};
}

const consumer = await describeStreamConsumer(ConsumerName, client, StreamARN);
let { ConsumerStatus, ConsumerARN } = consumer;

if (ConsumerStatus === 'ACTIVE') {
logger.debug('The stream consumer exists already and is active.');
}

if (ConsumerStatus === 'DELETING') {
logger.debug('Waiting for the stream consumer to complete deletion…');
let checks = 0;
while ((await describeStreamConsumer()).ConsumerStatus) {
await wait(CONSUMER_STATE_CHECK_DELAY);
checks += 1;
if (checks > CONSUMER_MAX_STATE_CHECKS) {
const errMsg = `Consumer "${ConsumerName}" exceeded the maximum wait time for deletion.`;
logger.error(errMsg);
throw new Error(errMsg);
}
}
logger.debug('The stream consumer is now gone.');
ConsumerStatus = '';
}

if (!ConsumerStatus) {
logger.debug('Trying to register the consumer…');
const { Consumer } = await client.registerStreamConsumer({ ConsumerName, StreamARN }).promise();
({ ConsumerStatus, ConsumerARN } = Consumer);
}

if (ConsumerStatus === 'CREATING') {
logger.debug('Waiting until the stream consumer is active…');
let checks = 0;
while ((await describeStreamConsumer()).ConsumerStatus !== 'ACTIVE') {
await wait(CONSUMER_STATE_CHECK_DELAY);
checks += 1;
if (checks > CONSUMER_MAX_STATE_CHECKS) {
const errMsg = `Consumer "${ConsumerName}" exceeded the maximum wait time for activation.`;
logger.error(errMsg);
throw new Error(errMsg);
}
}
logger.debug('The stream consumer is now active.');
}

return ConsumerARN;
};
Loading