Skip to content

Commit

Permalink
Refactor put record(s) results
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed May 24, 2019
1 parent 5ff386e commit c86066b
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 163 deletions.
114 changes: 62 additions & 52 deletions README.md

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ConsumersManager {
awsOptions,
client,
compression,
limit,
logger,
noRecordsPollDelay,
pollDelay,
Expand All @@ -32,6 +33,7 @@ class ConsumersManager {
client,
compression,
consumers: {},
limit,
logger,
noRecordsPollDelay,
pollDelay,
Expand All @@ -50,6 +52,7 @@ class ConsumersManager {
client,
compression,
consumers,
limit,
logger,
noRecordsPollDelay,
pollDelay,
Expand Down Expand Up @@ -95,6 +98,7 @@ class ConsumersManager {
? new PollingConsumer({
client,
compression,
limit,
logger,
noRecordsPollDelay,
pollDelay,
Expand Down Expand Up @@ -132,6 +136,11 @@ class ConsumersManager {
.filter(shardId => !ownedShards[shardId])
.forEach(stopConsumer);
}

stop() {
const { consumers } = internal(this);
Object.keys(consumers).forEach(shardId => consumers[shardId].stop());
}
}

module.exports = ConsumersManager;
182 changes: 109 additions & 73 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,40 @@ const {

const privateData = new WeakMap();

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

async function ensureStreamInitialized(instance, streamNameParam) {
/**
* Ensures a Kinesis stream exists and that is encrypted and tagged as required.
*
* @param {Object} instance - The instance of the Kinesis class where the call originated from.
* @param {string} [streamName] - The name of the stream to check initialization for.
* @fulfil {undefined}
* @returns {Promise}
* @private
*/
async function ensureStreamInitialized(instance, streamName) {
const privateProps = internal(instance);
const { encryption, streamArn, streamCreatedOn, streamName, tags } = privateProps;
let streamProps;

if (!streamNameParam || streamNameParam === streamName) {
if (streamArn && streamCreatedOn) return;
streamProps = privateProps;
Object.assign(privateProps, await ensureStreamExists(streamProps));
let params;
if (!streamName || streamName === privateProps.streamName) {
params = privateProps;
Object.assign(privateProps, await ensureStreamExists(params));
} else {
streamProps = Object.assign({}, privateProps, { streamName: streamNameParam });
await ensureStreamExists(streamProps);
params = { ...privateProps, streamName };
await ensureStreamExists(params);
}

if (encryption) await ensureStreamEncription(streamProps);
if (tags) await confirmStreamTags(streamProps);
const { encryption, tags } = params;
if (encryption) await ensureStreamEncription(params);
if (tags) await confirmStreamTags(params);
}

/**
Expand Down Expand Up @@ -108,13 +121,12 @@ async function setUpEnhancedConsumers(instance) {
}

/**
* A [pass-through stream]{@link external:NodeJsPassThrough} class specialization implementing a
* consumer of Kinesis Data Streams using the [AWS SDK for JavaScript]{@link external:AwsJsSdk}.
* Incoming data can be retrieved through either the `data` event or by piping the instance to a
* writable stream.
* A [pass-through stream]{@link external:PassThrough} class specialization implementing a consumer
* of Kinesis Data Streams using the [AWS SDK for JavaScript]{@link external:AwsJsSdk}. Incoming
* data can be retrieved through either the `data` event or by piping the instance to other streams.
*
* @alias module:lifion-kinesis
* @extends external:Readable
* @extends external:PassThrough
*/
class Kinesis extends PassThrough {
/**
Expand All @@ -130,8 +142,8 @@ class Kinesis extends PassThrough {
* @param {boolean} [options.createStreamIfNeeded=true] - Whether if the Kinesis stream should
* be automatically created if it doesn't exist upon connection
* @param {Object} [options.dynamoDb={}] - The initialization options for the DynamoDB client
* used to store the state of the stream consumers. In addition to `tableNames` and
* `tags`, it can also contain any of the [`AWS.DynamoDB` options]{@link AwsJsSdkDynamoDb}.
* used to store the state of the consumers. In addition to `tableNames` and `tags`, it
* can also contain any of the [`AWS.DynamoDB` options]{@link external:AwsJsSdkDynamoDb}.
* @param {string} [options.dynamoDb.tableName] - The name of the table in which to store the
* state of consumers. If not provided, it defaults to "lifion-kinesis-state".
* @param {Object} [options.dynamoDb.tags] - If provided, the client will ensure that the
Expand All @@ -142,6 +154,8 @@ class Kinesis extends PassThrough {
* @param {string} [options.encryption.keyId] - The GUID for the customer-managed AWS KMS key
* to use for encryption. This value can be a globally unique identifier, a fully
* specified ARN to either an alias or a key, or an alias name prefixed by "alias/".
* @param {number} [options.limit=10000] - The limit of records per get records call (only
* applicable with `useEnhancedFanOut` is set to `false`)
* @param {Object} [options.logger] - An object with the `warn`, `debug`, and `error` functions
* that will be used for logging purposes. If not provided, logging will be omitted.
* @param {number} [options.noRecordsPollDelay=1000] - The delay in milliseconds before
Expand Down Expand Up @@ -181,6 +195,7 @@ class Kinesis extends PassThrough {
createStreamIfNeeded = true,
dynamoDb = {},
encryption,
limit = 10000,
logger = {},
noRecordsPollDelay = 1000,
pollDelay = 250,
Expand All @@ -207,10 +222,11 @@ class Kinesis extends PassThrough {
throw new TypeError(errorMsg);
}

const normNoRecordsPollDelay = Number(noRecordsPollDelay);
const normPollDelay = Number(pollDelay);
const normShardCount = Number(shardCount);
const normStatsInterval = Number(statsInterval);
const limitNumber = Number(limit);
const noRecordsPollDelayNumber = Number(noRecordsPollDelay);
const pollDelayNumber = Number(pollDelay);
const shardCountNumber = Number(shardCount);
const statsIntervalNumber = Number(statsInterval);

Object.assign(internal(this), {
awsOptions,
Expand All @@ -222,12 +238,13 @@ class Kinesis extends PassThrough {
dynamoDb,
encryption,
getStatsIntervalId: null,
limit: limitNumber > 0 ? limitNumber : 10000,
logger: normLogger,
noRecordsPollDelay: normNoRecordsPollDelay >= 250 ? normNoRecordsPollDelay : 250,
pollDelay: normPollDelay >= 0 ? normPollDelay : 250,
noRecordsPollDelay: noRecordsPollDelayNumber >= 250 ? noRecordsPollDelayNumber : 250,
pollDelay: pollDelayNumber >= 0 ? pollDelayNumber : 250,
recordsEncoder: getRecordsEncoder(compression, 'Buffer'),
shardCount: normShardCount >= 1 ? normShardCount : 1,
statsInterval: normStatsInterval >= 1000 ? normStatsInterval : 30000,
shardCount: shardCountNumber >= 1 ? shardCountNumber : 1,
statsInterval: statsIntervalNumber >= 1000 ? statsIntervalNumber : 30000,
streamName,
tags,
useAutoCheckpoints: Boolean(useAutoCheckpoints),
Expand All @@ -238,11 +255,11 @@ class Kinesis extends PassThrough {
}

/**
* Initializes the client, by ensuring that the stream exists, it's ready, and configured as
* requested. The internal managers that deal with heartbeats, state, and consumers will also
* be started.
* Starts the stream consumer, by ensuring that the stream exists, that it's ready, and
* configured as requested. The internal managers that deal with heartbeats, state, and
* consumers will also be started.
*
* @fulfil Once the client has successfully started.
* @fulfil {undefined} - Once the consumer has successfully started.
* @reject {Error} - On any unexpected error while trying to start.
* @returns {Promise}
*/
Expand Down Expand Up @@ -289,49 +306,70 @@ class Kinesis extends PassThrough {
logger.debug('The consumer is now ready.');
}

/**
* Stops the stream consumer. The internal managers will also be stopped.
*/
stopConsumer() {
const privateProps = internal(this);
const { getStatsIntervalId } = privateProps;
const { consumersManager, getStatsIntervalId, heartbeatManager, leaseManager } = privateProps;
heartbeatManager.stop();
consumersManager.stop();
leaseManager.stop();
clearTimeout(getStatsIntervalId);
privateProps.getStatsIntervalId = null;
}

/**
* Puts a record to a stream.
* Writes a single data record into a stream.
*
* @param {Object} params - The putRecord parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecord` parameters]{@link external:AwsJsSdkKinesisPutRecord}
* from the `AWS.Kinesis.putRecord` method in camel case.
* @param {(Object|string)} params.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided on
* client instantiation.
* @fulfil If record is successfully pushed to stream.
* @param {Object} params - The parameters.
* @param {*} params.data - The data to put into the record.
* @param {string} [params.explicitHashKey] - The hash value used to explicitly determine the
* shard the data record is assigned to by overriding the partition key hash.
* @param {string} [params.partitionKey] - Determines which shard in the stream the data record
* is assigned to. If omitted, it will be calculated based on a SHA-1 hash of the data.
* @param {string} [params.sequenceNumberForOrdering] - Set this to the sequence number obtained
* from the last put record operation to guarantee strictly increasing sequence numbers,
* for puts from the same client and to the same partition key. If omitted, records are
* coarsely ordered based on arrival time.
* @param {string} [params.streamName] - If provided, the record will be put into the specified
* stream instead of the stream name provided during the consumer instantiation.
* @fulfil {Object} - The de-serialized data returned from the request.
* @reject {Error} - On any unexpected error while pushing to stream.
* @returns {Promise}
*/
async putRecord({ streamName, ...record }) {
async putRecord(params = {}) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
const params = {
const { client, createStreamIfNeeded, recordsEncoder } = privateProps;
const { streamName, ...record } = params;
const awsParams = {
...(await recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, SequenceNumber, ShardId }) => ({
encryptionType: EncryptionType,
sequenceNumber: SequenceNumber,
shardId: ShardId
});
try {
const result = await client.putRecord(params);
return result;
return parseResult(await client.putRecord(awsParams));
} catch (err) {
if (err.code !== 'ResourceNotFoundException') throw err;
await ensureStreamInitialized(this, streamName);
return client.putRecord(params);
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return parseResult(await client.putRecord(awsParams));
}
throw err;
}
}

/**
* Batch puts multiple records to a stream.
*
* @param {Object} params - The putRecords parameters. In addition to the params described here,
* uses [`AWS.Kinesis.putRecords` parameters]{@link external:AwsJsSdkKinesisPutRecords}
* from the `AWS.Kinesis.putRecords` method in camel case.
* @param {Object} params - The parameters.
* @param {Array} params.records - A list of records to push to a Kinesis stream.
* @param {(Object|string)} params.records.data - The data to be used as the Kinesis message.
* @param {string} [params.streamName] - If provided, overrides the stream name provided
Expand All @@ -340,23 +378,31 @@ class Kinesis extends PassThrough {
* @reject {Error} - On any unexpected error while pushing to stream.
* @returns {Promise}
*/
async putRecords({ records, streamName }) {
async putRecords(params = {}) {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
const params = {
const { client, createStreamIfNeeded, recordsEncoder } = privateProps;
const { records, streamName } = params;
const awsParams = {
Records: await Promise.all(records.map(recordsEncoder)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, Records }) => ({
encryptionType: EncryptionType,
records: Records.map(({ SequenceNumber, ShardId }) => ({
sequenceNumber: SequenceNumber,
shardId: ShardId
}))
});
try {
const result = await client.putRecords(params);
return result;
return parseResult(await client.putRecords(awsParams));
} catch (err) {
if (
err.code === 'ResourceNotFoundException' ||
(err.code === 'UnknownError' && client.isEndpointLocal())
) {
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return client.putRecords(params);
return parseResult(await client.putRecords(awsParams));
}
throw err;
}
Expand All @@ -382,23 +428,13 @@ class Kinesis extends PassThrough {
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property
*/

/**
* @external AwsJsSdkKinesisPutRecord
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property
*/

/**
* @external AwsJsSdkKinesisPutRecords
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property
*/

/**
* @external AwsJsSdkDynamoDb
* @see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property
*/

/**
* @external NodeJsPassThrough
* @external PassThrough
* @see https://nodejs.org/dist/latest-v10.x/docs/api/stream.html#stream_class_stream_passthrough
*/

Expand Down
Loading

0 comments on commit c86066b

Please sign in to comment.