Skip to content

Commit

Permalink
Simplify the polling consumer set checkpoint calls
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 24, 2019
1 parent bad779e commit eadb90b
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Initializes a new instance of the Kinesis client.
| Param | Type | Default | Description |
| -------------------------------- | -------------------- | ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | | The initialization options. In addition to the below options, it can also contain any of the [`AWS.Kinesis` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#constructor-property). |
| [options.compression] | <code>string</code> | | The kind of data compression to use wit records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.compression] | <code>string</code> | | The kind of data compression to use with records. The currently available compression options are either `"LZ-UTF8"` or none. |
| [options.consumerGroup] | <code>string</code> | | The name of the group of consumers in which shards will be distributed and checkpoints will be shared. If not provided, it defaults to the name of the application/project using this module. |
| [options.createStreamIfNeeded] | <code>boolean</code> | <code>true</code> | Whether if the Kinesis stream should be automatically created if it doesn't exist upon connection |
| [options.dynamoDb] | <code>Object</code> | <code>{}</code> | The initialization options for the DynamoDB client used to store the state of the stream consumers. In addition to `tableNames` and `tags`, it can also contain any of the [`AWS.DynamoDB` options](AwsJsSdkDynamoDb). |
Expand Down
2 changes: 1 addition & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Kinesis extends PassThrough {
*
* @param {Object} options - The initialization options. In addition to the below options, it
* can also contain any of the [`AWS.Kinesis` options]{@link external:AwsJsSdkKinesis}.
* @param {string} [options.compression] - The kind of data compression to use wit records.
* @param {string} [options.compression] - The kind of data compression to use with records.
* The currently available compression options are either `"LZ-UTF8"` or none.
* @param {string} [options.consumerGroup] - The name of the group of consumers in which shards
* will be distributed and checkpoints will be shared. If not provided, it defaults to
Expand Down
14 changes: 3 additions & 11 deletions lib/polling-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ async function getShardIterator(client, logger, streamName, shardId, sequenceNum
}
}

async function storeShardCheckpoint(instance, shardId, sequenceNumber) {
if (!sequenceNumber || typeof sequenceNumber !== 'string') {
throw new TypeError('The sequence number argument is required');
}
const { stateStore } = internal(instance);
await stateStore.storeShardCheckpoint(shardId, sequenceNumber);
}

/**
* Polls for records and pushes them to the parent stream. If auto-checkpoints are enabled, they
* will be stored before the request for records.
Expand Down Expand Up @@ -88,7 +80,7 @@ async function pollForRecords(instance) {
}

if (seqNumToCheckpoint) {
await storeShardCheckpoint(instance, shardId, seqNumToCheckpoint);
await stateStore.storeShardCheckpoint(shardId, seqNumToCheckpoint);
privateProps.seqNumToCheckpoint = null;
}

Expand Down Expand Up @@ -131,7 +123,7 @@ async function pollForRecords(instance) {
if (useAutoCheckpoints) {
const { sequenceNumber } = records[recordsCount - 1];
if (!usePausedPolling) {
await storeShardCheckpoint(instance, shardId, sequenceNumber);
await stateStore.storeShardCheckpoint(shardId, sequenceNumber);
} else {
privateProps.seqNumToCheckpoint = sequenceNumber;
}
Expand Down Expand Up @@ -178,7 +170,7 @@ class PollingConsumer {
pushToStream,
recordsDecoder: getRecordsDecoder(compression, 'Buffer'),
seqNumToCheckpoint: null,
setCheckpoint: storeShardCheckpoint.bind(this, this, shardId),
setCheckpoint: sequenceNumber => stateStore.storeShardCheckpoint(shardId, sequenceNumber),
shardId,
stateStore,
stopConsumer,
Expand Down
1 change: 1 addition & 0 deletions lib/state-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ class StateStore {
}

async storeShardCheckpoint(shardId, checkpoint) {
if (typeof checkpoint !== 'string') throw new TypeError('The sequence number is required.');
const { client, consumerGroup, shardsPath, shardsPathNames, streamName } = internal(this);

await client.update({
Expand Down

0 comments on commit eadb90b

Please sign in to comment.