Skip to content

Commit

Permalink
Connect the parsed records into the client
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Oct 11, 2018
1 parent d30f709 commit cd5d904
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
12 changes: 8 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const { Kinesis: AwsKinesis } = require('aws-sdk');
const { PassThrough } = require('stream');
const ShardSubscriber = require('./shard-subscriber');
const consumer = require('./consumer');
const stream = require('./stream');
Expand All @@ -13,8 +14,9 @@ function internal(instance) {
return privateData.get(instance);
}

class Kinesis {
class Kinesis extends PassThrough {
constructor(options = {}) {
super({ objectMode: true });
const {
compression,
consumerName,
Expand Down Expand Up @@ -71,9 +73,11 @@ class Kinesis {
ctx.shards = (await stream.getShards(ctx)) || [];

logger.debug(`Creating subscribers for the stream shards using "${consumerName}"…`);
ctx.shardSubscribers = ctx.shards.map(shard =>
new ShardSubscriber({ ...ctx, emitter: this, shard }).start()
);
ctx.shards.forEach(shard => {
const subscriber = new ShardSubscriber({ ...ctx, emitter: this, shard });
subscriber.start();
subscriber.pipe(this);
});

logger.debug('The client is now connected.');
}
Expand Down
29 changes: 9 additions & 20 deletions lib/shard-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const AWS = require('aws-sdk');
const aws4 = require('aws4');
const got = require('got');
const { Parser } = require('lifion-aws-event-stream');
const { pipeline, Writable } = require('stream');
const { pipeline, PassThrough } = require('stream');
const Decoder = require('./records-decoder');

const AWS_API_TARGET = 'Kinesis_20131202.SubscribeToShard';
Expand All @@ -18,8 +18,9 @@ function internal(instance) {
return privateData.get(instance);
}

class ShardSubscriber {
class ShardSubscriber extends PassThrough {
constructor(ctx) {
super({ objectMode: true });
const { options } = ctx;
const { endpoint = 'https://kinesis.us-east-1.amazonaws.com', region } = options;
const credentialsChain = new AWS.CredentialProviderChain();
Expand Down Expand Up @@ -67,24 +68,12 @@ class ShardSubscriber {
} else logger.debug('Subscription to shard is successful.');
});

pipeline(
stream,
new Parser(),
new Decoder(ctx),
new Writable({
objectMode: true,
write(chunk, encoding, callback) {
logger.debug(chunk);
callback();
}
}),
err => {
if (err) {
logger.error(err);
request.abort();
} else logger.debug('Subscription pipeline completed.');
}
);
pipeline([stream, new Parser(), new Decoder(ctx), this], err => {
if (err) {
logger.error(err.stack);
request.abort();
} else logger.debug('Subscription pipeline completed.');
});

return this;
}
Expand Down

0 comments on commit cd5d904

Please sign in to comment.