Skip to content

Commit

Permalink
Confirm parent depletion works with fan-out consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 25, 2019
1 parent 59a23d3 commit 6866d9d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 24 deletions.
5 changes: 4 additions & 1 deletion lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ class FanOutConsumer {

stop() {
const privateProps = internal(this);
const { request } = privateProps;
const { expirationTimeoutId, request } = privateProps;
if (request) {
request.abort();
privateProps.request = null;
privateProps.retryPipeline = false;
}
clearTimeout(expirationTimeoutId);
privateProps.expirationTimeoutId = null;
}

updateLeaseExpiration(leaseExpiration) {
Expand All @@ -230,6 +232,7 @@ class FanOutConsumer {
privateProps.leaseExpiration = leaseExpiration;

clearTimeout(expirationTimeoutId);
privateProps.expirationTimeoutId = null;

const delay = new Date(leaseExpiration).getTime() - Date.now() - EXPIRATION_TIMEOUT_OFFSET;
if (delay < 0) return;
Expand Down
9 changes: 7 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ async function setUpEnhancedConsumers(instance) {
const enhancedConsumersCount = Object.keys(enhancedConsumers).length;

// Register new enhanced fan-out consumers until reaching the maximum allowed.
const newEnhancedConsumers = [];
for (let i = enhancedConsumersCount; i < MAX_ENHANCED_CONSUMERS; i += 1) {
const consumerName = `${moduleName}-${generate()}`;
await registerEnhancedConsumer({ client, consumerName, logger, streamArn });
newEnhancedConsumers.push(`${moduleName}-${generate()}`);
}
await Promise.all(
newEnhancedConsumers.map(consumerName =>
registerEnhancedConsumer({ client, consumerName, logger, streamArn })
)
);

// Retrieve the enhanced fan-out consumers again (will include the newly registered ones).
enhancedConsumers = await getEnhancedConsumers({ client, logger, streamArn });
Expand Down
11 changes: 11 additions & 0 deletions lib/kinesis-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ class KinesisClient {
});
}

deregisterStreamConsumer(...args) {
return internal(this)
.client.deregisterStreamConsumer(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}

registerStreamConsumer(...args) {
return internal(this)
.client.registerStreamConsumer(...args)
Expand Down
4 changes: 3 additions & 1 deletion lib/lease-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ async function acquireLease(instance, shardId, shardsDescription) {
const state = await stateStore.getShardAndStreamState(shardId, shardDescription);
const { shardState, streamState } = state;
const { consumers, shards } = streamState;
let ownLeasesCount = Object.values(shards).filter(i => i.leaseOwner === consumerId).length;
let { leaseExpiration, leaseOwner, version } = shardState;
const { depleted, parent } = shardState;
let ownLeasesCount = Object.values(shards).filter(
shard => shard.leaseOwner === consumerId && !shard.depleted
).length;

// If the shard has been marked as depleted, don't lease it.
if (depleted) {
Expand Down
21 changes: 1 addition & 20 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,6 @@ async function describeEnhancedConsumer(props) {
return Consumers.find(i => i.ConsumerName === consumerName) || {};
}

async function deregisterEnhancedConsumer(props) {
const { client, consumerArn, consumerName, logger, streamArn } = props;
await client.deregisterStreamConsumer({
ConsumerARN: consumerArn,
ConsumerName: consumerName,
StreamARN: streamArn
});
let { ConsumerStatus } = await describeEnhancedConsumer({ client, consumerName, streamArn });
if (ConsumerStatus === 'DELETING') {
logger.debug(`Waiting for the enhanced consumer "${consumerName}" to complete deletion…`);
do {
await wait(CONSUMER_STATE_CHECK_DELAY);
({ ConsumerStatus } = await describeEnhancedConsumer({ client, consumerName, streamArn }));
} while (ConsumerStatus === 'DELETING');
logger.debug(`The enhanced consumer "${consumerName}" is gone.`);
}
}

async function ensureStreamEncription(props) {
const { client, encryption, logger, streamName: StreamName } = props;
const { keyId: KeyId, type: EncryptionType } = encryption;
Expand Down Expand Up @@ -179,15 +161,14 @@ async function registerEnhancedConsumer(props) {
logger.debug(`Waiting for the new enhanced consumer "${consumerName}" to be active…`);
do {
await wait(CONSUMER_STATE_CHECK_DELAY);
({ ConsumerStatus } = await deregisterEnhancedConsumer({ client, consumerName, streamArn }));
({ ConsumerStatus } = await describeEnhancedConsumer({ client, consumerName, streamArn }));
} while (ConsumerStatus !== 'ACTIVE');
logger.debug(`The enhanced consumer "${consumerName}" is now active.`);
}

module.exports = {
checkIfStreamExists,
confirmStreamTags,
deregisterEnhancedConsumer,
ensureStreamEncription,
ensureStreamExists,
getEnhancedConsumers,
Expand Down

0 comments on commit 6866d9d

Please sign in to comment.