Skip to content

Commit

Permalink
Refactor “setUpEnhancedConsumers” so it’s easier to follow
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 24, 2019
1 parent eadb90b commit 272c9f3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 23 deletions.
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ pipeline([
- [lifion-kinesis](#module_lifion-kinesis)
- [Kinesis](#exp_module_lifion-kinesis--Kinesis) ⇐ <code>external:Readable</code> ⏏
- [new Kinesis(options)](#new_module_lifion-kinesis--Kinesis_new)
- [.startConsumer()](#module_lifion-kinesis--Kinesis+startConsumer) ⇒ <code>Promise</code>
- [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
- _instance_
- [.startConsumer()](#module_lifion-kinesis--Kinesis+startConsumer) ⇒ <code>Promise</code>
- [.putRecord(params)](#module_lifion-kinesis--Kinesis+putRecord) ⇒ <code>Promise</code>
- [.putRecords(params)](#module_lifion-kinesis--Kinesis+putRecords) ⇒ <code>Promise</code>
- _inner_
- [~setUpEnhancedConsumers(instance)](#module_lifion-kinesis--Kinesis..setUpEnhancedConsumers) ⇒ <code>Promise</code>

<a name="exp_module_lifion-kinesis--Kinesis"></a>

Expand Down Expand Up @@ -148,6 +151,24 @@ Batch puts multiple records to a stream.
| params.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [params.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |

<a name="module_lifion-kinesis--Kinesis..setUpEnhancedConsumers"></a>

#### Kinesis~setUpEnhancedConsumers(instance) ⇒ <code>Promise</code>

If the `useEnhancedFanOut` option is enabled, this function will be called to prepare for the
automated distribution of the Amazon-registered enhanced fan-out consumers into the consumers
of this module on the same consumer group. The preparation consist in the pre-registration of
the maximum allowed number of enhanced fan-out consumers in Amazon, and also in making sure
that the state of the stream, in DynamoDB, reflects the existing enhanced fan-out consumers.
Stale state will be removed, existing enhanced fan-out consumers not registered by this module
will be preserved and used in the automated distribution.

**Kind**: inner method of [<code>Kinesis</code>](#exp_module_lifion-kinesis--Kinesis)

| Param | Type | Description |
| -------- | ------------------- | ------------------------------------------------- |
| instance | <code>Object</code> | A reference to the instance of the Kinesis class. |

## License

[MIT](./LICENSE)
52 changes: 32 additions & 20 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,51 @@ async function ensureStreamInitialized(instance) {
if (tags) await confirmStreamTags(privateProps);
}

/**
* If the `useEnhancedFanOut` option is enabled, this function will be called to prepare for the
* automated distribution of the Amazon-registered enhanced fan-out consumers into the consumers
* of this module on the same consumer group. The preparation consist in the pre-registration of
* the maximum allowed number of enhanced fan-out consumers in Amazon, and also in making sure
* that the state of the stream, in DynamoDB, reflects the existing enhanced fan-out consumers.
* Stale state will be removed, existing enhanced fan-out consumers not registered by this module
* will be preserved and used in the automated distribution.
*
* @param {Object} instance - A reference to the instance of the Kinesis class.
* @returns {Promise}
*/
async function setUpEnhancedConsumers(instance) {
const { client, logger, stateStore, streamArn, streamName } = internal(instance);
logger.debug(`Cleaning up enhanced consumers for "${streamName}"…`);

let consumers = await getStreamConsumers({ client, logger, streamArn });
const consumerNames = Object.keys(consumers);
const namesToRegister = [];
// Retrieve the existing enhanced fan-out consumers for the stream.
const existingEnhancedConsumers = await getStreamConsumers({ client, logger, streamArn });
const existingEnhancedConsumersCount = Object.keys(existingEnhancedConsumers).length;

for (let i = consumerNames.length; i < MAX_ENHANCED_CONSUMERS; i += 1) {
namesToRegister.push(`${moduleName}-${generate()}`);
// Register new enhanced fan-out consumers until reaching the maximum allowed.
for (let i = existingEnhancedConsumersCount; i < MAX_ENHANCED_CONSUMERS; i += 1) {
const consumerName = `${moduleName}-${generate()}`;
await registerStreamConsumer({ client, consumerName, logger, streamArn });
}

await Promise.all(
namesToRegister.map(consumerName =>
registerStreamConsumer({ client, consumerName, logger, streamArn })
)
);

consumers = await getStreamConsumers({ client, logger, streamArn });
// Retrieve the enhanced fan-out consumers again (will include the newly registered ones).
const enhancedConsumers = await getStreamConsumers({ client, logger, streamArn });

// Make sure the stream state contains the newly registered consumers.
await Promise.all(
Object.keys(consumers).map(consumerName =>
stateStore.registerEnhancedConsumer(consumerName, consumers[consumerName].arn)
)
Object.keys(enhancedConsumers).map(consumerName => {
const { arn } = enhancedConsumers[consumerName];
return stateStore.registerEnhancedConsumer(consumerName, arn);
})
);

const knownConsumers = await stateStore.getStreamConsumers();
const unknownConsumers = Object.keys(knownConsumers).filter(
consumerName => !Object.keys(consumers).includes(consumerName)
);
// Get the enhanced consumers from the stream state.
const enhancedConsumersState = await stateStore.getStreamConsumers();

// Remove old enhanced fan-out consumers from the stream state.
await Promise.all(
unknownConsumers.map(consumerName => stateStore.deregisterStreamConsumer(consumerName))
Object.keys(enhancedConsumersState)
.filter(consumerName => !Object.keys(enhancedConsumers).includes(consumerName))
.map(async consumerName => stateStore.deregisterStreamConsumer(consumerName))
);
}

Expand Down

0 comments on commit 272c9f3

Please sign in to comment.