Skip to content

Commit

Permalink
Add a Kinesis records decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Oct 10, 2018
1 parent a641c0d commit c695f98
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 19 deletions.
15 changes: 15 additions & 0 deletions lib/compression.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const { decompressAsync } = require('lzutf8');

module.exports = {
'LZ-UTF8': {
decompress: input =>
new Promise((resolve, reject) => {
decompressAsync(input, { inputEncoding: 'Base64', useWebWorker: false }, (output, err) => {
if (!err) resolve(output);
else reject(err);
});
})
}
};
4 changes: 3 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ function internal(instance) {
class Kinesis {
constructor(options = {}) {
const {
compression,
consumerName,
createStreamIfNeeded = true,
encryption,
Expand Down Expand Up @@ -44,6 +45,7 @@ class Kinesis {
}

Object.assign(internal(this), {
compression,
consumerName,
createStreamIfNeeded,
encryption,
Expand All @@ -66,7 +68,7 @@ class Kinesis {
if (encryption) await stream.encrypt(ctx);
if (tags) await stream.tag(ctx);
ctx.consumerArn = await consumer.activate(ctx);
ctx.shards = await stream.getShards(ctx);
ctx.shards = (await stream.getShards(ctx)) || [];

logger.debug(`Creating subscribers for the stream shards using "${consumerName}"…`);
ctx.shardSubscribers = ctx.shards.map(shard =>
Expand Down
5 changes: 5 additions & 0 deletions lib/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ describe('lib/index', () => {
await kinesis.connect();
expect(stream.activate).toBeCalledWith({
client: expect.any(Object),
compression: undefined,
consumerArn: undefined,
consumerName,
createStreamIfNeeded: true,
encryption: undefined,
logger: expect.any(Object),
options: {},
shardCount: 1,
shardSubscribers: [],
shards: [],
streamArn: undefined,
streamName: 'bar',
tags: undefined
Expand All @@ -83,6 +86,7 @@ describe('lib/index', () => {
expect(logger).toEqual({ debug: noop, error: noop });
expect(noop.mock.calls).toEqual([
['Trying to connect the client…'],
['Creating subscribers for the stream shards using "foo"…'],
['The client is now connected.']
]);
});
Expand All @@ -93,6 +97,7 @@ describe('lib/index', () => {
expect(utils.noop).not.toBeCalled();
expect(logger.debug.mock.calls).toEqual([
['Trying to connect the client…'],
['Creating subscribers for the stream shards using "foo"…'],
['The client is now connected.']
]);
expect(() => new Kinesis({ logger })).toThrow();
Expand Down
58 changes: 58 additions & 0 deletions lib/records-decoder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
'use strict';

const { Transform } = require('stream');
const compressionLibs = require('./compression');
const { isJson } = require('./utils');

const privateData = new WeakMap();

function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

class RecordsDecoder extends Transform {
constructor(options) {
super({ objectMode: true });
const { compression } = options;
const compressionLib = compression && compressionLibs[compression];
Object.assign(internal(this), { ...options, compressionLib });
}

async _transform({ headers, payload }, encoding, callback) {
const { compressionLib, logger } = internal(this);
const msgType = headers[':message-type'];
const eventType = headers[':event-type'];

if (msgType !== 'event') {
this.emit('error', new Error(`Unknown event stream message type "${msgType}".`));
return;
}

if (eventType === 'SubscribeToShardEvent') {
try {
const decodedRecords = await Promise.all(
payload.Records.map(async record => {
const { Data } = record;
let parsedData;
if (compressionLib) parsedData = await compressionLib.decompress(Data);
else parsedData = Buffer.from(Data).toString('utf8');
if (isJson(parsedData)) parsedData = JSON.parse(parsedData);
return { ...record, Data: parsedData };
})
);
this.push({ ...payload, Records: decodedRecords });
callback();
return;
} catch (err) {
this.emit('error', err);
}
}

logger.debug(`Emiting "${eventType}"…`);
this.emit(eventType, payload);
callback();
}
}

module.exports = RecordsDecoder;
4 changes: 3 additions & 1 deletion lib/shard-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const aws4 = require('aws4');
const got = require('got');
const { Parser } = require('lifion-aws-event-stream');
const { pipeline, Writable } = require('stream');
const Decoder = require('./records-decoder');

const AWS_API_TARGET = 'Kinesis_20131202.SubscribeToShard';
const AWS_EVENT_STREAM = 'application/vnd.amazon.eventstream';
Expand Down Expand Up @@ -68,7 +69,8 @@ class ShardSubscriber {

pipeline(
stream,
new Parser(ctx),
new Parser(),
new Decoder(ctx),
new Writable({
objectMode: true,
write(chunk, encoding, callback) {
Expand Down
1 change: 1 addition & 0 deletions lib/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('lib/stream', () => {
expect(stream).toEqual({
activate: expect.any(Function),
encrypt: expect.any(Function),
getShards: expect.any(Function),
tag: expect.any(Function)
});
});
Expand Down
4 changes: 4 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
'use strict';

const isJsonRegex = /^[{[].*[}\]]$/;

module.exports.isJson = input => isJsonRegex.test(input);

module.exports.noop = () => {};

module.exports.wait = ms =>
Expand Down
11 changes: 10 additions & 1 deletion lib/utils.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,25 @@

const utils = require('./utils');

const { noop, wait } = utils;
const { isJson, noop, wait } = utils;

describe('lib/utils', () => {
test('the module exports the expected', () => {
expect(utils).toEqual({
isJson: expect.any(Function),
noop: expect.any(Function),
wait: expect.any(Function)
});
});

test('the isJson function returns true when called with a JSON', () => {
expect(isJson(JSON.stringify({ foo: 'bar' }))).toBe(true);
});

test('the isJson function returns false when called with a non-JSON string', () => {
expect(isJson('{')).toBe(false);
});

test('the noop function can be used to default functions in options', () => {
const { foo = noop } = {};
expect(() => foo()).not.toThrow();
Expand Down
29 changes: 17 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@
"version": "auto-changelog -p && git add CHANGELOG.md"
},
"dependencies": {
"aws-sdk": "^2.322.0",
"aws-sdk": "^2.331.0",
"aws4": "^1.8.0",
"fast-deep-equal": "^2.0.1",
"got": "^9.2.2",
"lifion-aws-event-stream": "^1.0.0"
"lifion-aws-event-stream": "^1.0.0",
"lzutf8": "^0.5.5"
},
"devDependencies": {
"auto-changelog": "^1.8.0",
"auto-changelog": "^1.8.1",
"chalk": "^2.4.1",
"check-engines": "^1.5.0",
"codecov": "^3.1.0",
"eslint": "^5.6.1",
"eslint-config-lifion": "^1.0.1",
"eslint-config-lifion": "^1.0.2",
"husky": "^1.1.1",
"jest": "^23.6.0",
"jest-junit": "^5.2.0",
Expand Down

0 comments on commit c695f98

Please sign in to comment.