Skip to content

Commit

Permalink
Make sure the shouldParseJson flag is passed down
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Oct 23, 2019
1 parent 9e65110 commit 64d3a28
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
7 changes: 7 additions & 0 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class ConsumersManager {
* option defines the delay in milliseconds in between poll requests for more records.
* @param {Function} options.pushToStream - A function to push incoming records to the consumer.
* @param {object} options.s3 - The S3 options in the current kinesis client.
* @param {string|boolean} [options.shouldParseJson] - Whether if retrieved records' data should
* be parsed as JSON or not.
* @param {object} options.stateStore - An instance of the state store.
* @param {string} options.streamName - The name of the Kinesis stream.
* @param {boolean} options.useAutoCheckpoints - Whether to automatically store shard checkpoints
Expand All @@ -68,6 +70,7 @@ class ConsumersManager {
pushToStream,
s3,
s3Client,
shouldParseJson,
stateStore,
streamName,
useAutoCheckpoints,
Expand All @@ -88,6 +91,7 @@ class ConsumersManager {
pushToStream,
s3,
s3Client,
shouldParseJson,
stateStore,
streamName,
useAutoCheckpoints,
Expand Down Expand Up @@ -118,6 +122,7 @@ class ConsumersManager {
pushToStream,
s3,
s3Client,
shouldParseJson,
stateStore,
streamName,
useAutoCheckpoints,
Expand Down Expand Up @@ -177,6 +182,7 @@ class ConsumersManager {
stateStore,
stopConsumer,
streamName,
shouldParseJson,
useAutoCheckpoints,
usePausedPolling,
useS3ForLargeItems,
Expand All @@ -191,6 +197,7 @@ class ConsumersManager {
pushToStream,
s3,
s3Client,
shouldParseJson,
shardId,
stateStore,
stopConsumer,
Expand Down
12 changes: 11 additions & 1 deletion lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class FanOutConsumer {
pushToStream,
s3,
shardId,
shouldParseJson,
stateStore,
stopConsumer,
streamName,
Expand Down Expand Up @@ -241,6 +242,7 @@ class FanOutConsumer {
retryPipeline: true,
s3,
shardId,
shouldParseJson,
stateStore,
stopConsumer,
streamName,
Expand All @@ -267,6 +269,7 @@ class FanOutConsumer {
s3,
s3Client,
shardId,
shouldParseJson,
stateStore,
stopConsumer,
streamName,
Expand Down Expand Up @@ -356,7 +359,14 @@ class FanOutConsumer {
stream,
new PreProcess({ requestFlags }),
new Parser(),
new RecordsDecoder({ compression, logger, s3, s3Client, useS3ForLargeItems }),
new RecordsDecoder({
compression,
logger,
s3,
s3Client,
shouldParseJson,
useS3ForLargeItems
}),
new PostProcess({
abort,
logger,
Expand Down
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Kinesis extends PassThrough {
pollDelay = 250,
s3 = {},
shardCount = 1,
shouldParseJson,
shouldParseJson = 'auto',
statsInterval = 30000,
streamName,
tags,
Expand Down Expand Up @@ -332,7 +332,7 @@ class Kinesis extends PassThrough {
},
s3Client: null,
shardCount: shardCountNumber >= 1 ? shardCountNumber : 1,
shouldParseJson: 'auto',
shouldParseJson,
statsInterval: statsIntervalNumber >= 1000 ? statsIntervalNumber : 30000,
streamName,
tags,
Expand Down
2 changes: 2 additions & 0 deletions lib/polling-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class PollingConsumer {
s3,
s3Client,
shardId,
shouldParseJson,
stateStore,
stopConsumer,
streamName,
Expand All @@ -245,6 +246,7 @@ class PollingConsumer {
inputEncoding: 'Buffer',
logger,
s3Client,
shouldParseJson,
useS3ForLargeItems
}),
s3,
Expand Down
6 changes: 5 additions & 1 deletion lib/records.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ function getRecordsDecoder({
data = Buffer.from(data, 'base64').toString('utf8');
}

if ((shouldParseJson === 'auto' && IS_JSON_REGEX.test(data)) || shouldParseJson === true) {
if (
((shouldParseJson === undefined || shouldParseJson === 'auto') &&
IS_JSON_REGEX.test(data)) ||
shouldParseJson === true
) {
data = JSON.parse(data);
}

Expand Down

0 comments on commit 64d3a28

Please sign in to comment.