From f4e0d51da4ce3cba977d5f6a89aa07ca5f1ae63d Mon Sep 17 00:00:00 2001 From: Mackenzie Turner Date: Mon, 22 Apr 2019 17:36:30 -0400 Subject: [PATCH 1/2] Add retry to putRecord(s) --- lib/kinesis-client.js | 73 +++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 17 deletions(-) diff --git a/lib/kinesis-client.js b/lib/kinesis-client.js index ef123346..708061ee 100644 --- a/lib/kinesis-client.js +++ b/lib/kinesis-client.js @@ -199,25 +199,64 @@ class KinesisClient { } putRecord(...args) { - return internal(this) - .client.putRecord(...args) - .promise() - .catch(err => { - const error = new Error(err.message); - error.code = err.code; - throw error; - }); + const { client, retryOpts } = internal(this); + return retry( + bail => + client + .putRecord(...args) + .promise() + .catch(err => { + if (shouldBailRetry(err)) bail(err); + else throw err; + }), + retryOpts + ).catch(err => { + const error = new Error(err.message); + error.code = err.code; + throw error; + }); } - putRecords(...args) { - return internal(this) - .client.putRecords(...args) - .promise() - .catch(err => { - const error = new Error(err.message); - error.code = err.code; - throw error; - }); + async putRecords(params) { + const { client, retryOpts } = internal(this); + const { Records, ...opts } = params; + + let records = Records; + let results = []; + let failedRecordCount = 0; + + return retry( + bail => + client + .putRecords({ ...opts, Records: records }) + .promise() + .then(payload => { + ({ FailedRecordCount: failedRecordCount, Records: results } = payload); + if (failedRecordCount === 0) return; + + let code; + let message; + + records = records.filter((record, i) => { + const { ErrorCode: errorCode, ErrorMessage: errorMessage } = results[i]; + if (errorCode && !code) code = errorCode; + if (errorMessage && !message) message = errorMessage; + return errorCode; + }); + + const errObj = { message, code, statusCode: null, requestId: null }; + throw errObj; + }) + .catch(err => { + if (shouldBailRetry(err)) bail(err); + else throw err; + }), + retryOpts + ).catch(err => { + const error = new Error(err.message); + error.code = err.code; + throw error; + }); } } From 0a92f99dcfe531760a6ddb53f7ed9f354c019452 Mon Sep 17 00:00:00 2001 From: Mackenzie Turner Date: Tue, 23 Apr 2019 14:39:35 -0400 Subject: [PATCH 2/2] Retry only on throughput --- lib/kinesis-client.js | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/kinesis-client.js b/lib/kinesis-client.js index 708061ee..6cd76813 100644 --- a/lib/kinesis-client.js +++ b/lib/kinesis-client.js @@ -206,7 +206,7 @@ class KinesisClient { .putRecord(...args) .promise() .catch(err => { - if (shouldBailRetry(err)) bail(err); + if (err.code !== 'ProvisionedThroughputExceededException') bail(err); else throw err; }), retryOpts @@ -238,17 +238,22 @@ class KinesisClient { let message; records = records.filter((record, i) => { - const { ErrorCode: errorCode, ErrorMessage: errorMessage } = results[i]; - if (errorCode && !code) code = errorCode; - if (errorMessage && !message) message = errorMessage; - return errorCode; + const { ErrorCode, ErrorMessage } = results[i]; + if (ErrorCode && ErrorCode !== 'ProvisionedThroughputExceededException') { + code = ErrorCode; + message = ErrorMessage; + return false; + } + if (ErrorCode && !code) code = code || ErrorCode; + if (ErrorMessage && !message) message = ErrorMessage; + return ErrorCode; }); const errObj = { message, code, statusCode: null, requestId: null }; throw errObj; }) .catch(err => { - if (shouldBailRetry(err)) bail(err); + if (err.code !== 'ProvisionedThroughputExceededException') bail(err); else throw err; }), retryOpts