Skip to content

Commit

Permalink
Document the rest of the modules
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Jun 5, 2019
1 parent 12901b7 commit 5d10b17
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 3 deletions.
38 changes: 38 additions & 0 deletions lib/compression.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
/**
* Module with a suite of compress and decompress functions for different compression algorithms.
*
* @module compression
* @private
*/

'use strict';

const { compressAsync, decompressAsync } = require('lzutf8');

module.exports = {
/**
* Compress and decompress methods for the LZ-UTF8 algorithm. LZ-UTF8 is an extension to the
* [UTF-8]{@link external:UTF8} character encoding, augmenting the UTF-8 bytestream with
* optional compression based the [LZ77]{@link external:LZ77} algorithm.
*/
'LZ-UTF8': {
/**
* Compresses the given input using the specified encoding using LZ-UTF8.
*
* @param {Buffer} input - The buffer to compress.
* @param {string} outputEncoding - The encoding of the result.
* @fulfil {Buffer} - The compressed input.
* @returns {Promise}
*/
compress: (input, outputEncoding) =>
new Promise((resolve, reject) => {
const options = { outputEncoding, useWebWorker: false };
Expand All @@ -12,6 +32,14 @@ module.exports = {
else reject(err);
});
}),
/**
* Decompresses the given input using the specified encoding using LZ-UTF8.
*
* @param {Buffer} input - The buffer to decompress.
* @param {string} inputEncoding - The encoding of the input buffer to decompress.
* @fulfil {String} - A decompressed UTF-8 string.
* @returns {Promise}
*/
decompress: (input, inputEncoding) =>
new Promise((resolve, reject) => {
const options = { inputEncoding, useWebWorker: false };
Expand All @@ -22,3 +50,13 @@ module.exports = {
})
}
};

/**
* @external UTF8
* @see https://en.wikipedia.org/wiki/UTF-8
*/

/**
* @external LZ77
* @see https://en.wikipedia.org/wiki/LZ77_and_LZ78
*/
17 changes: 17 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
/**
* Module with a collection of constants used across the codebase.
*
* @module constants
* @private
*/

'use strict';

/**
* A list of error codes that should not be retried.
*
* @memberof module:constants
*/
const BAIL_RETRY_LIST = Object.freeze([
'ConditionalCheckFailedException',
'MissingParameter',
Expand All @@ -12,6 +24,11 @@ const BAIL_RETRY_LIST = Object.freeze([
'ValidationException'
]);

/**
* A list of error codes that should always be retried.
*
* @memberof module:constants
*/
const FORCED_RETRY_LIST = Object.freeze(['ENOTFOUND', 'ENETUNREACH']);

module.exports = Object.freeze({
Expand Down
55 changes: 55 additions & 0 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,58 @@
/**
* Module that ensures there are active consumers for the shards with an active lease.
*
* @module consumers-manager
* @private
*/

'use strict';

const FanOutConsumer = require('./fan-out-consumer');
const PollingConsumer = require('./polling-consumer');

const privateData = new WeakMap();

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

/**
* Class that implements the consumers manager module.
*
* @alias module:consumers-manager
*/
class ConsumersManager {
/**
* Initializes an instance of the consumers manager.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.Kinesis.
* @param {Object} options.client - An instance of the Kinesis client.
* @param {string} options.compression - The kind of data compression to use with records.
* @param {number} options.limit - The limit of records per get records call.
* @param {Object} options.logger - An instance of a logger.
* @param {number} options.noRecordsPollDelay - The delay in milliseconds before attempting to
* get more records when there were none in the previous attempt.
* @param {number} options.pollDelay - When the `usePausedPolling` option is `false`, this
* option defines the delay in milliseconds in between poll requests for more records.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {Object} options.stateStore - An instance of the state store.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoCheckpoints - Whether to automatically store shard checkpoints
* using the sequence number of the most-recently received record or not.
* @param {boolean} options.useEnhancedFanOut - Whether if the consumer is using enhanced
* fan-out shard consumers or not.
* @param {boolean} options.usePausedPolling - Whether if the client is waiting for
* user-intervention before polling for more records, or not.
*/
constructor(options) {
const {
awsOptions,
Expand Down Expand Up @@ -46,6 +88,14 @@ class ConsumersManager {
});
}

/**
* Triggers the reconciliation of shard consumers where new instances of either the fan-out or
* polling consumers will be initialized for newly acquired shard leases, or where running
* consumers will be stopped for lost or expired shard leases.
*
* @fulfil {undefined}
* @returns {Promise}
*/
async reconcile() {
const {
awsOptions,
Expand Down Expand Up @@ -137,6 +187,11 @@ class ConsumersManager {
.forEach(stopConsumer);
}

/**
* Stops all the running shard consumers.
*
* @returns {undefined}
*/
stop() {
const { consumers } = internal(this);
Object.keys(consumers).forEach(shardId => consumers[shardId].stop());
Expand Down
122 changes: 122 additions & 0 deletions lib/dynamodb-client.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/**
* Module that wraps the calls to the AWS.DynamoDB library. Calls are wrapped so they can be
* retried with a custom logic instead of the one provided by the AWS-SDK. In addition to retries,
* calls are also promisified and the call stacks are preserved even in async/await calls by using
* the `CAPTURE_STACK_TRACE` environment variable.
*
* @module dynamodb-client
* @private
*/

'use strict';

const retry = require('async-retry');
Expand All @@ -9,11 +19,32 @@ const { getStackObj, shouldBailRetry, transformErrorStack } = require('./utils')
const privateData = new WeakMap();
const statsSource = 'dynamoDb';

/**
* Provides access to the private data of the specified instance.
*
* @param {Object} instance - The private data's owner.
* @returns {Object} The private data.
* @private
*/
function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
return privateData.get(instance);
}

/**
* Calls a method on the given instance of AWS.DynamoDB. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are the original ones provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.DynamoDB.
* @param {string} methodName - The name of the method to call.
* @param {...*} args - The arguments of the method call.
* @fulfil {*} - The original response from the AWS.DynamoDB call.
* @reject {Error} - The error details from AWS.DynamoDB with a corrected error stack.
* @returns {Promise}
* @private
*
*/
async function sdkCall(client, methodName, ...args) {
const stackObj = getStackObj(sdkCall);
try {
Expand All @@ -35,6 +66,20 @@ async function sdkCall(client, methodName, ...args) {
}
}

/**
* Calls a method on the given instance of AWS.DynamoDB. The call is promisified, the call stack
* is preserved, and the results of the call are aggregated in the stats. Retries in this function
* are based on a custom logic replacing the one provided by the AWS-SDK.
*
* @param {Object} client - An instance of AWS.DynamoDB.
* @param {string} methodName - The name of the method to call.
* @param {Object} retryOpts - The [retry options as in async-retry]{@link external:AsyncRetry}.
* @param {...*} args - The argument of the method call.
* @fulfil {*} - The original response from the AWS.DynamoDB call.
* @reject {Error} - The error details from AWS.DynamoDB with a corrected error stack.
* @returns {Promise}
* @private
*/
function retriableSdkCall(client, methodName, retryOpts, ...args) {
const stackObj = getStackObj(retriableSdkCall);
return retry(bail => {
Expand All @@ -60,7 +105,20 @@ function retriableSdkCall(client, methodName, retryOpts, ...args) {
}, retryOpts);
}

/**
* A class that wraps AWS.DynamoDB.
*
* @alias module:dynamodb-client
*/
class DynamoDbClient {
/**
* Initializes the AWS.DynamoDB internal instance and prepares the retry logic.
*
* @param {Object} options - The initialization options.
* @param {Object} options.awsOptions - The initialization options for AWS.DynamoDB.
* @param {Object} options.logger - An instace of a logger.
* @param {string} options.tableName - The name of the DynamoDB table.
*/
constructor({ awsOptions, logger, tableName }) {
const client = new DynamoDB(awsOptions);

Expand Down Expand Up @@ -90,49 +148,113 @@ class DynamoDbClient {
Object.assign(internal(this), { client, docClient, retryOpts });
}

/**
* The CreateTable operation adds a new table to your account.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
createTable(...args) {
const { client } = internal(this);
return sdkCall(client, 'createTable', ...args);
}

/**
* Returns information about the table, including the current status of the table, when it was
* created, the primary key schema, and any indexes on the table.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
describeTable(...args) {
const { client, retryOpts } = internal(this);
return retriableSdkCall(client, 'describeTable', retryOpts, ...args);
}

/**
* List all tags on an Amazon DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
listTagsOfResource(...args) {
const { client, retryOpts } = internal(this);
return retriableSdkCall(client, 'listTagsOfResource', retryOpts, ...args);
}

/**
* Associate a set of tags with an Amazon DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
tagResource(...args) {
const { client } = internal(this);
return sdkCall(client, 'tagResource', ...args);
}

/**
* Waits for a given DynamoDB resource.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
waitFor(...args) {
const { client, retryOpts } = internal(this);
return retriableSdkCall(client, 'waitFor', retryOpts, ...args);
}

/**
* Deletes a single item in a table by primary key by delegating to `AWS.DynamoDB.deleteItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
delete(...args) {
const { docClient } = internal(this);
return sdkCall(docClient, 'delete', ...args);
}

/**
* Returns a set of attributes for the item with the given primary key by delegating to
* `AWS.DynamoDB.getItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
get(...args) {
const { docClient, retryOpts } = internal(this);
return retriableSdkCall(docClient, 'get', retryOpts, ...args);
}

/**
* Creates a new item, or replaces an old item with a new item by delegating to
* `AWS.DynamoDB.putItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
put(...args) {
const { docClient, retryOpts } = internal(this);
return retriableSdkCall(docClient, 'put', retryOpts, ...args);
}

/**
* Edits an existing item's attributes, or adds a new item to the table if it does not already
* exist by delegating to `AWS.DynamoDB.updateItem()`.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
update(...args) {
const { docClient, retryOpts } = internal(this);
return retriableSdkCall(docClient, 'update', retryOpts, ...args);
}
}

/**
* @external AsyncRetry
* @see https://github.com/zeit/async-retry#api
*/

module.exports = DynamoDbClient;
Loading

0 comments on commit 5d10b17

Please sign in to comment.