Skip to content

Commit

Permalink
Normalize the stream and table modules
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 12, 2019
1 parent e64136d commit 3400bcd
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 273 deletions.
74 changes: 33 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,66 +58,58 @@ pipeline([
## API Reference

- [lifion-kinesis](#module_lifion-kinesis)
- [Kinesis](#exp_module_lifion-kinesis--Kinesis)[<code>Readable</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams)
- [Kinesis](#exp_module_lifion-kinesis--Kinesis) ⇐ <code>external:Readable</code> ⏏
- [new Kinesis(options)](#new_module_lifion-kinesis--Kinesis_new)
- [.start()](#module_lifion-kinesis--Kinesis+start) ⇒ <code>Promise</code>
- [.start()](#module_lifion-kinesis--Kinesis+start)

<a name="exp_module_lifion-kinesis--Kinesis"></a>

### Kinesis ⇐ [<code>Readable</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams)
### Kinesis ⇐ <code>external:Readable</code> ⏏

A specialization of the [readable stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams) class implementing a
consumer of Kinesis Data Streams using the
[enhanced fan-out feature](https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html). Upon connection, instances of this
class will subscribe to receive data from all the shards of the given stream. Incoming data can
be retrieved through either the `data` event or by piping the instance to a writable stream.
A [pass-through stream](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough) class specialization implementing a
consumer of Kinesis Data Streams using the [AWS SDK for JavaScript](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest).
Incoming data can be retrieved through either the `data` event or by piping the instance to a
writable stream.

**Kind**: Exported class
**Extends**: [<code>Readable</code>](https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_readable_streams)
**Extends**: <code>external:Readable</code>
<a name="new_module_lifion-kinesis--Kinesis_new"></a>

#### new Kinesis(options)

Initializes a new instance of the Kinesis client.

| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, this object can also contain the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.consumerName] | <code>string</code> | | The unique name of the consumer for the given stream. This option is required if `options.useEnhancedFanOut` is true. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>false</code> | Whether if the Kinesis stream should be created if it doesn't exist upon connection. |
| [options.encryption] | <code>Object</code> | | The encryption options to enforce in the stream. |
| [options.encryption.type] | <code>string</code> | | The encryption type to use. |
| [options.encryption.keyId] | <code>string</code> | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.logger] | <code>Object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.shardCount] | <code>number</code> | <code>1</code> | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set). |
| options.streamName | <code>string</code> | | The name of the stream to consume data from. This option is required. |
| [options.tags] | <code>Object</code> | | If provided, the client will ensure that the stream is tagged with these hash of tags upon connection. If the stream is already tagged same tag keys, they won't be overriden. If the stream is already tagged with different tag keys, they won't be removed. |
| [options.useAutoCheckpoints] | <code>boolean</code> | <code>true</code> | Set to true to automatically checkpoint as messages are reported back to consumers of the client. |
| [options.useAutoShardAssignment] | <code>boolean</code> | <code>true</code> | Set to true to automatically assign the stream shards to all the active clients so only one client reads from one shard at the same time. Set to false to make the client read from all shards. |
| [options.useEnhancedFanOut] | <code>boolean</code> | <code>false</code> | Set to true to make the client use enhanced fan-out consumers to read from shards. |
| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the stream consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](AwsJsSdkDynamoDb). |
| [options.dynamoDb.tableName] | <code>string</code> | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". |
| [options.dynamoDb.tags] | <code>Object</code> | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. |
| [options.encryption] | <code>Object</code> | | The encryption options to enforce in the stream. |
| [options.encryption.type] | <code>string</code> | | The encryption type to use. |
| [options.encryption.keyId] | <code>string</code> | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". |
| [options.logger] | <code>Object</code> | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. |
| [options.shardCount] | <code>number</code> | <code>1</code> | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set). |
| options.streamName | <code>string</code> | | The name of the stream to consume data from. This option is required. |
| [options.tags] | <code>Object</code> | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. |
| [options.useAutoCheckpoints] | <code>boolean</code> | <code>true</code> | Set to `true` to automatically store shard checkpoints for the consumer group as messages are reported as processed. |
| [options.useAutoShardAssignment] | <code>boolean</code> | <code>true</code> | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. |
| [options.useEnhancedFanOut] | <code>boolean</code> | <code>false</code> | Set to `true` to make the client use enhanced fan-out consumers to read from shards. |

<a name="module_lifion-kinesis--Kinesis+start"></a>

#### kinesis.start() ⇒ <code>Promise</code>
#### kinesis.start()

Initializes the Kinesis client, then it proceeds to:

1. Create the stream if asked for.
2. Ensure that the stream is active.
3. Ensure that the stream is encrypted as indicated.
4. Ensure that the stream is tagged as requested.
5. Ensure an enhanced fan-out consumer with the given name exists.
6. Ensure that the enhanced fan-out consumer is active.
7. A subscription for data is issued to all the shard in the stream.
8. Data will then be available in both [stream read modes](external:readModes).
Initializes the client, by ensuring that the stream exists, it's ready, and configured as
requested. The internal managers that deal with heartbeats, state, and consumers will also
be started.

**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Returns**: <code>Promise</code> - nothing
**Fulfil**: Once the stream is active, encrypted, tagged, the enhanced fan-out consumer is active,
and the client is subscribed to the data in all the stream shards.
**Reject**: <code>Error</code> - If at least one of the above steps fails to succeed.
**Fulfil**: Once the client has successfully started.
**Reject**: <code>Error</code> - On any unexpected error while trying to start.

## License

Expand Down
18 changes: 6 additions & 12 deletions lib/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@ const { decompressAsync } = require('lzutf8');

module.exports = {
'LZ-UTF8': {
decompress: input =>
decompress: (input, inputEncoding = 'Base64') =>
new Promise((resolve, reject) => {
decompressAsync(
input,
{
inputEncoding: 'Base64',
useWebWorker: false
},
(output, err) => {
if (!err) resolve(output);
else reject(err);
}
);
const options = { inputEncoding, useWebWorker: false };
decompressAsync(input, options, (output, err) => {
if (!err) resolve(output);
else reject(err);
});
})
}
};
85 changes: 85 additions & 0 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
'use strict';

const PollingConsumer = require('./polling-consumer');

const privateData = new WeakMap();

function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

class ConsumersManager {
constructor(options) {
const {
client,
compression,
streamName,
logger,
pushToStream,
stateStore,
useEnhancedFanOut
} = options;

Object.assign(internal(this), {
client,
compression,
consumers: {},
logger,
stateStore,
pushToStream,
streamName,
useEnhancedFanOut
});
}

async reconcile() {
const {
client,
consumers,
compression,
pushToStream,
logger,
stateStore,
streamName,
useEnhancedFanOut
} = internal(this);

logger.debug('Reconciling shard consumers…');

if (useEnhancedFanOut) {
throw new Error('The fan-out consumers are not supported yet.');
}

const ownedShards = await stateStore.getOwnedShards();
const ownedShardIds = Object.keys(ownedShards);

// Start consumers for the shards the consumer owns.
ownedShardIds.forEach(shardId => {
if (!consumers[shardId]) {
const shard = ownedShards[shardId];
const consumer = new PollingConsumer({
client,
compression,
pushToStream,
logger,
shardId,
streamName,
...shard
});
consumers[shardId] = consumer;
consumer.start();
}
});

// Stop the consumers whose leases were lost.
Object.keys(consumers)
.filter(shardId => !ownedShards[shardId])
.forEach(async shardId => {
await consumers[shardId].stop();
consumers[shardId] = undefined;
});
}
}

module.exports = ConsumersManager;
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions lib/consumer.test.js → lib/fan-out-consumer.test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict';

const { Kinesis, mockClear, mockConsumers } = require('aws-sdk');
const consumer = require('./consumer');
const consumer = require('./fan-out-consumer');

describe('lib/consumer', () => {
describe('lib/fan-out-consumer', () => {
let client;
let logger;
let ctx;
Expand Down
Loading

0 comments on commit 3400bcd

Please sign in to comment.