diff --git a/README.md b/README.md index 6292fd79..43cccc22 100644 --- a/README.md +++ b/README.md @@ -85,37 +85,38 @@ data can be retrieved through either the `data` event or by piping the instance Initializes a new instance of the Kinesis client. -| Param | Type | Default | Description | -| -------------------------------- | ------------------------------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| options | object | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). | -| [options.compression] | string | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. | -| [options.consumerGroup] | string | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. | -| [options.createStreamIfNeeded] | boolean | true | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection | -| [options.dynamoDb] | object | {} | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property). | -| [options.dynamoDb.tableName] | string | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". | -| [options.dynamoDb.tags] | object | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. | -| [options.encryption] | object | | The encryption options to enforce in the stream. | -| [options.encryption.type] | string | | The encryption type to use. | -| [options.encryption.keyId] | string | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". | -| [options.limit] | number | 10000 | The limit of records per get records call (only applicable with `useEnhancedFanOut` is set to `false`) | -| [options.logger] | object | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. | -| [options.noRecordsPollDelay] | number | 1000 | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when `useEnhancedFanOut` is set to `false`) | -| [options.pollDelay] | number | 250 | When the `usePausedPolling` option is `false`, this option defines the delay in milliseconds in between poll requests for more records (only applicable when `useEnhancedFanOut` is set to `false`) | -| [options.s3] | object | {} | The initialization options for the S3 client used to store large items in buckets. In addition to `bucketName` and `endpoint`, it can also contain any of the [`AWS.S3` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#constructor-property). | -| [options.s3.bucketName] | string | | The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream. | -| [options.s3.largeItemThreshold] | number | 900 | The size in KB above which an item should automatically be stored in s3. | -| [options.s3.nonS3Keys] | Array.<string> | [] | If the `useS3ForLargeItems` option is set to `true`, the `nonS3Keys` option lists the keys that will be sent normally on the kinesis record. | -| [options.s3.tags] | string | | If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged. | -| [options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set) | -| [options.shouldParseJson] | string \| boolean | "auto" | Whether if retrieved records' data should be parsed as JSON or not. Set to "auto" to only attempt parsing if data looks like JSON. Set to true to force data parse. | -| [options.statsInterval] | number | 30000 | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. | -| options.streamName | string | | The name of the stream to consume data from (required) | -| [options.tags] | object | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. | -| [options.useAutoCheckpoints] | boolean | true | Set to `true` to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to `false` consumers can use the `setCheckpoint()` function to store any sequence number as the checkpoint for the shard. | -| [options.useAutoShardAssignment] | boolean | true | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. | -| [options.useEnhancedFanOut] | boolean | false | Set to `true` to make the client use enhanced fan-out consumers to read from shards. | -| [options.usePausedPolling] | boolean | false | Set to `true` to make the client not to poll for more records until the consumer calls `continuePolling()`. This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when `useEnhancedFanOut` is set to `false`) | -| [options.useS3ForLargeItems] | boolean | false | Whether to automatically use an S3 bucket to store large items or not. | +| Param | Type | Default | Description | +| ----------------------------------- | ------------------------------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| options | object | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). | +| [options.compression] | string | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. | +| [options.consumerGroup] | string | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. | +| [options.createStreamIfNeeded] | boolean | true | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection | +| [options.dynamoDb] | object | {} | The initialization options for the DynamoDB client used to store the state of the consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property). | +| [options.dynamoDb.tableName] | string | | The name of the table in which to store the state of consumers. If not provided, it defaults to "lifion-kinesis-state". | +| [options.dynamoDb.tags] | object | | If provided, the client will ensure that the DynamoDB table where the state is stored is tagged with these tags. If the table already has tags, they will be merged. | +| [options.encryption] | object | | The encryption options to enforce in the stream. | +| [options.encryption.type] | string | | The encryption type to use. | +| [options.encryption.keyId] | string | | The GUID for the customer-managed AWS KMS key to use for encryption. This value can be a globally unique identifier, a fully specified ARN to either an alias or a key, or an alias name prefixed by "alias/". | +| [options.limit] | number | 10000 | The limit of records per get records call (only applicable with `useEnhancedFanOut` is set to `false`) | +| [options.logger] | object | | An object with the `warn`, `debug`, and `error` functions that will be used for logging purposes. If not provided, logging will be omitted. | +| [options.noRecordsPollDelay] | number | 1000 | The delay in milliseconds before attempting to get more records when there were none in the previous attempt (only applicable when `useEnhancedFanOut` is set to `false`) | +| [options.pollDelay] | number | 250 | When the `usePausedPolling` option is `false`, this option defines the delay in milliseconds in between poll requests for more records (only applicable when `useEnhancedFanOut` is set to `false`) | +| [options.s3] | object | {} | The initialization options for the S3 client used to store large items in buckets. In addition to `bucketName` and `endpoint`, it can also contain any of the [`AWS.S3` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#constructor-property). | +| [options.s3.bucketName] | string | | The name of the bucket in which to store large messages. If not provided, it defaults to the name of the Kinesis stream. | +| [options.s3.largeItemThreshold] | number | 900 | The size in KB above which an item should automatically be stored in s3. | +| [options.s3.nonS3Keys] | Array.<string> | [] | If the `useS3ForLargeItems` option is set to `true`, the `nonS3Keys` option lists the keys that will be sent normally on the kinesis record. | +| [options.s3.tags] | string | | If provided, the client will ensure that the S3 bucket is tagged with these tags. If the bucket already has tags, they will be merged. | +| [options.shardCount] | number | 1 | The number of shards that the newly-created stream will use (if the `createStreamIfNeeded` option is set) | +| [options.shouldParseJson] | string \| boolean | "auto" | Whether if retrieved records' data should be parsed as JSON or not. Set to "auto" to only attempt parsing if data looks like JSON. Set to true to force data parse. | +| [options.statsInterval] | number | 30000 | The interval in milliseconds for how often to emit the "stats" event. The event is only available while the consumer is running. | +| options.streamName | string | | The name of the stream to consume data from (required) | +| [options.supressThroughputWarnings] | boolean | false | Set to `true` to make the client log ProvisionedThroughputExceededException as debug rather than warning. | +| [options.tags] | object | | If provided, the client will ensure that the stream is tagged with these tags upon connection. If the stream is already tagged, the existing tags will be merged with the provided ones before updating them. | +| [options.useAutoCheckpoints] | boolean | true | Set to `true` to make the client automatically store shard checkpoints using the sequence number of the most-recently received record. If set to `false` consumers can use the `setCheckpoint()` function to store any sequence number as the checkpoint for the shard. | +| [options.useAutoShardAssignment] | boolean | true | Set to `true` to automatically assign the stream shards to the active consumers in the same group (so only one client reads from one shard at the same time). Set to `false` to make the client read from all shards. | +| [options.useEnhancedFanOut] | boolean | false | Set to `true` to make the client use enhanced fan-out consumers to read from shards. | +| [options.usePausedPolling] | boolean | false | Set to `true` to make the client not to poll for more records until the consumer calls `continuePolling()`. This option is useful when consumers want to make sure the records are fully processed before receiving more (only applicable when `useEnhancedFanOut` is set to `false`) | +| [options.useS3ForLargeItems] | boolean | false | Whether to automatically use an S3 bucket to store large items or not. | diff --git a/lib/index.js b/lib/index.js index a22b9b27..e8e854da 100644 --- a/lib/index.js +++ b/lib/index.js @@ -235,6 +235,8 @@ class Kinesis extends PassThrough { * @param {number} [options.statsInterval=30000] - The interval in milliseconds for how often to * emit the "stats" event. The event is only available while the consumer is running. * @param {string} options.streamName - The name of the stream to consume data from (required) + * @param {boolean} [options.supressThroughputWarnings=false] - Set to `true` to make the client + * log ProvisionedThroughputExceededException as debug rather than warning. * @param {object} [options.tags] - If provided, the client will ensure that the stream is tagged * with these tags upon connection. If the stream is already tagged, the existing tags * will be merged with the provided ones before updating them. @@ -272,6 +274,7 @@ class Kinesis extends PassThrough { shouldParseJson = 'auto', statsInterval = 30000, streamName, + supressThroughputWarnings = false, tags, useAutoCheckpoints = true, useAutoShardAssignment = true, @@ -311,7 +314,12 @@ class Kinesis extends PassThrough { Object.assign(internal(this), { awsOptions, - client: new KinesisClient({ awsOptions, logger: normLogger, streamName }), + client: new KinesisClient({ + awsOptions, + logger: normLogger, + streamName, + supressThroughputWarnings + }), compression, consumerGroup, consumerId: generate(), diff --git a/lib/index.test.js b/lib/index.test.js index 01cb2260..dc79b6fc 100644 --- a/lib/index.test.js +++ b/lib/index.test.js @@ -249,7 +249,12 @@ describe('lib/index', () => { warn: expect.any(Function) }; const streamName = 'test-stream'; - expect(KinesisClient).toHaveBeenCalledWith({ awsOptions, logger, streamName }); + expect(KinesisClient).toHaveBeenCalledWith({ + awsOptions, + logger, + streamName, + supressThroughputWarnings: false + }); await expect(kinesis.startConsumer()).resolves.toBeUndefined(); const useEnhancedFanOut = false; const consumerId = '0000'; diff --git a/lib/kinesis-client.js b/lib/kinesis-client.js index 0fb1ca96..5d5494ae 100644 --- a/lib/kinesis-client.js +++ b/lib/kinesis-client.js @@ -131,8 +131,10 @@ class KinesisClient { * @param {object} options.awsOptions - The initialization options for AWS.Kinesis. * @param {object} options.logger - An instace of a logger. * @param {string} options.streamName - The name of the Kinesis stream for which calls relate to. + * @param {boolean} options.supressThroughputWarnings - Flag indicating whether or not + * to supress ProvisionedThroughputExceededException warning logs. */ - constructor({ awsOptions, logger, streamName }) { + constructor({ awsOptions, logger, streamName, supressThroughputWarnings }) { const client = new Kinesis(awsOptions); const retryOpts = { @@ -140,7 +142,11 @@ class KinesisClient { minTimeout: 1000, onRetry: err => { const { code, message, requestId, statusCode } = err; - logger.warn( + const loggerMethod = + supressThroughputWarnings && code === 'ProvisionedThroughputExceededException' + ? 'debug' + : 'warn'; + logger[loggerMethod]( `Trying to recover from AWS.Kinesis error…\n${[ `\t- Message: ${message}`, `\t- Request ID: ${requestId}`, diff --git a/lib/kinesis-client.test.js b/lib/kinesis-client.test.js index 3c7056e6..2b9379d3 100644 --- a/lib/kinesis-client.test.js +++ b/lib/kinesis-client.test.js @@ -34,8 +34,9 @@ function putTwoRecords(params) { } describe('lib/kinesis-client', () => { + const debug = jest.fn(); const warn = jest.fn(); - const logger = { warn }; + const logger = { debug, warn }; let client; let error; @@ -162,6 +163,23 @@ describe('lib/kinesis-client', () => { expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream'); expect(warn).toHaveBeenCalled(); }); + + test(`${methodName} retries errors from the wrapped SDK bug supresses logged warnings to debugs`, async () => { + error = Object.assign(new Error('foo'), { code: 'ProvisionedThroughputExceededException' }); + sdkClient[methodName].mockImplementationOnce(() => ({ + promise: () => Promise.reject(error) + })); + client = new KinesisClient({ + logger, + streamName: 'test-stream', + supressThroughputWarnings: true + }); + const promise = client[methodName]({}); + await expect(promise).resolves.toEqual({}); + expect(sdkClient[methodName]).toHaveBeenCalledTimes(2); + expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream'); + expect(debug).toHaveBeenCalled(); + }); } if (methodName === 'createStream' || methodName === 'startStreamEncryption') {