Skip to content

Commit

Permalink
Re-factor records encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 24, 2019
1 parent a1eeafb commit bad779e
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 89 deletions.
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pipeline([
- [Kinesis](#exp_module_lifion-kinesis--Kinesis) ⇐ <code>external:Readable</code> ⏏
- [new Kinesis(options)](#new_module_lifion-kinesis--Kinesis_new)
- [.startConsumer()](#module_lifion-kinesis--Kinesis+startConsumer) ⇒ <code>Promise</code>
- [.putRecord(options)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(options)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
- [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>

<a name="exp_module_lifion-kinesis--Kinesis"></a>

Expand All @@ -84,7 +84,7 @@ Initializes a new instance of the Kinesis client.
| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.compression] | <code>string</code> | | The kind of data compression to use wit records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the stream consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](AwsJsSdkDynamoDb). |
Expand Down Expand Up @@ -117,36 +117,36 @@ be started.
**Reject**: <code>Error</code> - On any unexpected error while trying to start.
<a name="module_lifion-kinesis--Kinesis+putRecord"></a>

#### kinesis.putRecord(options) ⇒ <code>Promise</code>
#### kinesis.putRecord(params) ⇒ <code>Promise</code>

Puts a record to a stream.

**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Fulfil**: If record is successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.

| Param | Type | Description |
| -------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecord options. In addition to the params described here, uses [`AWS.Kinesis.putRecord` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property) from the `AWS.Kinesis.putRecord` method in camel case. |
| options.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| ------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The putRecord parameters. In addition to the params described here, uses [`AWS.Kinesis.putRecord` parameters](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property) from the `AWS.Kinesis.putRecord` method in camel case. |
| params.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [params.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |

<a name="module_lifion-kinesis--Kinesis+putRecords"></a>

#### kinesis.putRecords(options) ⇒ <code>Promise</code>
#### kinesis.putRecords(params) ⇒ <code>Promise</code>

Batch puts multiple records to a stream.

**Kind**: instance method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)
**Fulfil**: If records are successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.

| Param | Type | Description |
| -------------------- | ------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecords options. In addition to the params described here, uses [`AWS.Kinesis.putRecords` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property) from the `AWS.Kinesis.putRecords` method in camel case. |
| options.records | <code>Array</code> | A list of records to push to a Kinesis stream. |
| options.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| ------------------- | ------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| params | <code>Object</code> | The putRecords parameters. In addition to the params described here, uses [`AWS.Kinesis.putRecords` parameters](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property) from the `AWS.Kinesis.putRecords` method in camel case. |
| params.records | <code>Array</code> | A list of records to push to a Kinesis stream. |
| params.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [params.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |

## License

Expand Down
6 changes: 3 additions & 3 deletions lib/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const { compressAsync, decompressAsync } = require('lzutf8');

module.exports = {
'LZ-UTF8': {
compress: input =>
compress: (input, outputEncoding) =>
new Promise((resolve, reject) => {
const options = { outputEncoding: 'Buffer', useWebWorker: false };
const options = { outputEncoding, useWebWorker: false };
compressAsync(input, options, (output, err) => {
if (!err) resolve(output);
else reject(err);
});
}),
decompress: (input, inputEncoding = 'Base64') =>
decompress: (input, inputEncoding) =>
new Promise((resolve, reject) => {
const options = { inputEncoding, useWebWorker: false };
decompressAsync(input, options, (output, err) => {
Expand Down
2 changes: 1 addition & 1 deletion lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const { Parser } = require('lifion-aws-event-stream');
const { Transform, Writable, pipeline } = require('stream');
const { promisify } = require('util');

const { RecordsDecoder } = require('./records-decoder');
const { RecordsDecoder } = require('./records');
const { safeJsonParse, wait } = require('./utils');

const AWS_API_TARGET = 'Kinesis_20131202.SubscribeToShard';
Expand Down
88 changes: 26 additions & 62 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const HeartbeatManager = require('./heartbeat-manager');
const KinesisClient = require('./kinesis-client');
const LeaseManager = require('./lease-manager');
const StateStore = require('./state-store');
const compressionLibs = require('./compression');
const { getRecordsEncoder } = require('./records');
const { name: moduleName } = require('../package.json');
const { noop } = require('./utils');

Expand Down Expand Up @@ -46,29 +46,6 @@ async function ensureStreamInitialized(instance) {
if (tags) await confirmStreamTags(privateProps);
}

async function buildRecordParams(options, compression) {
const { data, explicitHashKey, partitionKey } = options;

let normData = data;
if (data && typeof data !== 'string') {
try {
normData = JSON.stringify(data);
} catch (err) {
normData = data.toString();
}
}

const compressionLib = compression && compressionLibs[compression];
if (compressionLib) {
normData = await compressionLib.compress(normData);
}

const params = { Data: normData, PartitionKey: partitionKey || generate() };
if (explicitHashKey) params.ExplicitHashKey = explicitHashKey;

return params;
}

async function setUpEnhancedConsumers(instance) {
const { client, logger, stateStore, streamArn, streamName } = internal(instance);
logger.debug(`Cleaning up enhanced consumers for "${streamName}"…`);
Expand Down Expand Up @@ -120,7 +97,7 @@ class Kinesis extends PassThrough {
*
* @param {Object} options - The initialization options. In addition to the below options, it
* can also contain any of the [`AWS.Kinesis` options]{@link external:AwsJsSdkKinesis}.
* @param {string} [options.compression] - The kind of data compression to use with records.
* @param {string} [options.compression] - The kind of data compression to use wit records.
* The currently available compression options are either `"LZ-UTF8"` or none.
* @param {string} [options.consumerGroup] - The name of the group of consumers in which shards
* will be distributed and checkpoints will be shared. If not provided, it defaults to
Expand Down Expand Up @@ -218,6 +195,7 @@ class Kinesis extends PassThrough {
logger: normLogger,
noRecordsPollDelay: normNoRecordsPollDelay >= 250 ? normNoRecordsPollDelay : 250,
pollDelay: normPollDelay >= 0 ? normPollDelay : 250,
recordsEncoder: getRecordsEncoder(compression, 'Buffer'),
shardCount: normShardCount >= 1 ? normShardCount : 1,
streamName,
tags,
Expand Down Expand Up @@ -273,62 +251,48 @@ class Kinesis extends PassThrough {
/**
* Puts a record to a stream.
*
* @param {Object} options - The putRecord options. In addition to the params described here,
* uses [`AWS.Kinesis.putRecord` options]{@link external:AwsJsSdkKinesisPutRecord}
* @param {Object} params - The putRecord parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecord` parameters]{@link external:AwsJsSdkKinesisPutRecord}
* from the `AWS.Kinesis.putRecord` method in camel case.
* @param {(Object|string)} options.data - The data to be used as the Kinesis message.
* @param {string} [options.streamName] - If provided, overrides the stream name provided on
* @param {(Object|string)} params.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided on
* client instantiation.
* @fulfil If record is successfully pushed to stream.
* @reject {Error} - On any unexpected error while pushing to stream.
* @returns {Promise}
*/
async putRecord(options) {
const { sequenceNumberForOrdering, streamName: streamNameOpt } = options;
const { client, compression, streamName } = internal(this);

async putRecord({ streamName, ...record }) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
await ensureStreamInitialized(this);

const recordParams = await buildRecordParams(options, compression);
recordParams.StreamName = streamNameOpt || streamName;

if (sequenceNumberForOrdering) {
recordParams.SequenceNumberForOrdering = sequenceNumberForOrdering;
}

await client.putRecord(recordParams);
return client.putRecord({
...(await recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
});
}

/**
* Batch puts multiple records to a stream.
*
* @param {Object} options - The putRecords options. In addition to the params described here,
* uses [`AWS.Kinesis.putRecords` options]{@link external:AwsJsSdkKinesisPutRecords}
* @param {Object} params - The putRecords parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecords` parameters]{@link external:AwsJsSdkKinesisPutRecords}
* from the `AWS.Kinesis.putRecords` method in camel case.
* @param {Array} options.records - A list of records to push to a Kinesis stream.
* @param {(Object|string)} options.records.data - The data to be used as the Kinesis message.
* @param {string} [options.streamName] - If provided, overrides the stream name provided
* @param {Array} params.records - A list of records to push to a Kinesis stream.
* @param {(Object|string)} params.records.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided
* on client instantiation.
* @fulfil If records are successfully pushed to stream.
* @reject {Error} - On any unexpected error while pushing to stream.
* @returns {Promise}
*/
async putRecords(options) {
const { records, streamName: streamNameOpt } = options;
const { client, compression, streamName } = internal(this);

async putRecords({ records, streamName }) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
await ensureStreamInitialized(this);

const recordsParams = await Promise.all(
records.map(record => buildRecordParams(record, compression))
);

const params = {
Records: recordsParams,
StreamName: streamNameOpt || streamName
};

await client.putRecords(params);
return client.putRecords({
Records: await Promise.all(records.map(recordsEncoder)),
StreamName: streamName || privateProps.streamName
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/polling-consumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { getRecordsDecoder } = require('./records-decoder');
const { getRecordsDecoder } = require('./records');
const { getStreamShards } = require('./stream');

const privateData = new WeakMap();
Expand Down
Loading

0 comments on commit bad779e

Please sign in to comment.