Skip to content

Commit

Permalink
Fix re-assignment of enhanced consumers when there are more consumers…
Browse files Browse the repository at this point in the history
… than enhanced consumers
  • Loading branch information
eaviles committed Apr 26, 2019
1 parent 40f22f6 commit 5e1109b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 38 deletions.
49 changes: 13 additions & 36 deletions lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const AWS_JSON = 'application/x-amz-json-1.1';
const DEFAULT_KINESIS_ENDPOINT = 'https://kinesis.us-east-1.amazonaws.com';
const EXPIRATION_TIMEOUT_OFFSET = 1000;

const privateData = new WeakMap();
const asyncPipeline = promisify(pipeline);
const privateData = new WeakMap();

function internal(instance) {
if (!privateData.has(instance)) privateData.set(instance, {});
Expand Down Expand Up @@ -47,42 +47,21 @@ class PreProcess extends Transform {
}

class PostProcess extends Writable {
constructor({
markShardAsDepleted,
pushToStream,
shardId,
shardsPath,
shardsPathNames,
stateStore
}) {
constructor({ markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
super({ objectMode: true });
Object.assign(internal(this), {
markShardAsDepleted,
pushToStream,
shardId,
shardsPath,
shardsPathNames,
stateStore
setCheckpoint,
shardId
});
}

async _write(chunk, encoding, callback) {
const {
markShardAsDepleted,
pushToStream,
shardId,
shardsPath,
shardsPathNames,
stateStore
} = internal(this);
const { markShardAsDepleted, pushToStream, setCheckpoint, shardId } = internal(this);
const { continuationSequenceNumber, records } = chunk;
if (continuationSequenceNumber !== undefined) {
await stateStore.storeShardCheckpoint(
shardId,
continuationSequenceNumber,
shardsPath,
shardsPathNames
);
await setCheckpoint(continuationSequenceNumber);
if (records.length > 0) pushToStream(null, { ...chunk, shardId });
callback();
} else {
Expand Down Expand Up @@ -113,8 +92,9 @@ class FanOutConsumer {

const signRequest = async requestOptions => {
let { accessKeyId, secretAccessKey, sessionToken } = awsOptions;
if (!accessKeyId && !secretAccessKey && !sessionToken)
if (!accessKeyId && !secretAccessKey && !sessionToken) {
({ accessKeyId, secretAccessKey, sessionToken } = await credentialsChain.resolvePromise());
}
aws4.sign(requestOptions, { accessKeyId, secretAccessKey, sessionToken });
};

Expand Down Expand Up @@ -194,6 +174,10 @@ class FanOutConsumer {
stopConsumer(shardId);
};

const setCheckpoint = async sequenceNumber => {
await stateStore.storeShardCheckpoint(shardId, sequenceNumber, shardsPath, shardsPathNames);
};

do {
if (requestFlags.isEventStream === false) {
logger.warn(`Waiting before retrying the pipeline…`);
Expand Down Expand Up @@ -222,14 +206,7 @@ class FanOutConsumer {
new PreProcess({ requestFlags }),
new Parser(),
new RecordsDecoder({ compression }),
new PostProcess({
markShardAsDepleted,
pushToStream,
shardId,
shardsPath,
shardsPathNames,
stateStore
})
new PostProcess({ markShardAsDepleted, pushToStream, setCheckpoint, shardId })
]);
} catch (err) {
const { code, isRetryable, message } = err;
Expand Down
12 changes: 10 additions & 2 deletions lib/lease-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,27 @@ class LeaseManager {

async start() {
const privateProps = internal(this);
const { consumersManager, logger } = privateProps;
const { consumersManager, logger, stateStore } = privateProps;

const acquireLeases = async () => {
logger.debug('Trying to acquire leases…');
const { streamArn } = await checkIfStreamExists(privateProps);

const { streamArn } = await checkIfStreamExists(privateProps);
if (streamArn === null) {
logger.debug("Can't acquire leases as the stream is gone.");
stopManager(this);
return;
}

const consumerArn = await stateStore.getAssignedEnhancedConsumer();
if (!consumerArn) {
logger.debug("Can't acquire leases now as there's no assigned enhanced consumer.");
privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL);
return;
}

const shards = await getStreamShards(privateProps);

const changesDetected = (await Object.keys(shards).reduce(async (result, id) => {
return (await result).concat(await acquireLease(this, id, shards));
}, [])).some(Boolean);
Expand Down

0 comments on commit 5e1109b

Please sign in to comment.