Skip to content

Commit

Permalink
Correct the proxies, prepare for shard reader timers
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 4, 2019
1 parent db13161 commit e64136d
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 41 deletions.
48 changes: 39 additions & 9 deletions lib/dynamodb-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@ class DynamoDBProxy {
});
}

async describeTable(...args) {
createTable(...args) {
return internal(this)
.client.createTable(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}

describeTable(...args) {
return internal(this)
.client.describeTable(...args)
.promise()
Expand All @@ -30,7 +41,18 @@ class DynamoDBProxy {
});
}

async listTagsOfResource(...args) {
get(...args) {
return internal(this)
.docClient.get(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}

listTagsOfResource(...args) {
return internal(this)
.client.listTagsOfResource(...args)
.promise()
Expand All @@ -41,7 +63,7 @@ class DynamoDBProxy {
});
}

async put(...args) {
put(...args) {
return internal(this)
.docClient.put(...args)
.promise()
Expand All @@ -52,10 +74,9 @@ class DynamoDBProxy {
});
}

async get(...args) {
// console.trace('GET');
tagResource(...args) {
return internal(this)
.docClient.get(...args)
.client.tagResource(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
Expand All @@ -64,8 +85,7 @@ class DynamoDBProxy {
});
}

async update(...args) {
// console.trace('UPDATE');
update(...args) {
return internal(this)
.docClient.update(...args)
.promise()
Expand All @@ -75,6 +95,16 @@ class DynamoDBProxy {
throw error;
});
}
}

waitFor(...args) {
return internal(this)
.client.waitFor(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}
}
module.exports = DynamoDBProxy;
2 changes: 1 addition & 1 deletion lib/heartbeat-manager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const HEARTBEAT_INTERVAL = 30000;
const HEARTBEAT_INTERVAL = 10000;
const HEARTBEAT_FAILURE_TIMEOUT = HEARTBEAT_INTERVAL * 3;

const privateData = new WeakMap();
Expand Down
32 changes: 27 additions & 5 deletions lib/kinesis-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@ class KinesisProxy {
internal(this).client = new Kinesis(options);
}

async describeStream(...args) {
addTagsToStream(...args) {
return internal(this)
.client.addTagsToStream(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}

describeStream(...args) {
return internal(this)
.client.describeStream(...args)
.promise()
Expand All @@ -25,7 +36,18 @@ class KinesisProxy {
});
}

async listTagsForStream(...args) {
listShards(...args) {
return internal(this)
.client.listShards(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
error.code = err.code;
throw error;
});
}

listTagsForStream(...args) {
return internal(this)
.client.listTagsForStream(...args)
.promise()
Expand All @@ -36,9 +58,9 @@ class KinesisProxy {
});
}

async listShards(...args) {
startStreamEncryption(...args) {
return internal(this)
.client.listShards(...args)
.client.startStreamEncryption(...args)
.promise()
.catch(err => {
const error = new Error(err.message);
Expand All @@ -47,7 +69,7 @@ class KinesisProxy {
});
}

async waitFor(...args) {
waitFor(...args) {
return internal(this)
.client.waitFor(...args)
.promise()
Expand Down
28 changes: 18 additions & 10 deletions lib/lease-manager.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const ACQUIRE_LEASES_INTERVAL = 30 * 1000;
const LEASE_TERM_TIMEOUT = 5 * 60 * 1000;
const ACQUIRE_LEASES_INTERVAL = 10 * 1000;
const LEASE_TERM_TIMEOUT = 1 * 60 * 1000;
const LEASE_RENEWAL_OFFSET = Math.round(LEASE_TERM_TIMEOUT * 0.25);

const stream = require('./stream');
Expand Down Expand Up @@ -43,7 +43,7 @@ async function acquireLease(instance, shardId, shardsDescription) {
ownLeasesCount -= 1;
} else {
logger.debug(`Shard "${shardId}" is currently owned by this consumer.`);
return;
return false;
}
}

Expand All @@ -62,21 +62,21 @@ async function acquireLease(instance, shardId, shardsDescription) {
version = newVersion;
} else {
logger.debug(`The lease for shard "${shardId}" couldn't be released.`);
return;
return true;
}
}

// If the shard has an owner that is still there, don't lease it.
if (leaseOwner) {
logger.debug(`The shard "${shardId}" is owned by "${leaseOwner}".`);
return;
return false;
}

// If the shard has a parent that hasn't been depleted, don't lease it.
const parentShard = parent && shards[parent];
if (parentShard && !parentShard.depleted) {
logger.debug(`Cannot lease "${shardId}", the parent "${parent}" hasn't been depleted.`);
return;
return false;
}

// Check if leasing one more shard won't go over the maximum of allowed active leases.
Expand All @@ -85,14 +85,17 @@ async function acquireLease(instance, shardId, shardsDescription) {
const maxActiveLeases = Math.ceil(shardsCount / consumersCount);
if (ownLeasesCount + 1 > maxActiveLeases) {
logger.debug(`Maximum of ${maxActiveLeases} active leases reached, cannot lease "${shardId}".`);
return true;
}

// Try to lock the shard lease.
if (await stateStore.lockShardLease(shardId, LEASE_TERM_TIMEOUT, version)) {
logger.debug(`Lease for "${shardId}" acquired.`);
} else {
logger.debug(`Can't acquire lease for "${shardId}", someone else did it.`);
return true;
}

logger.debug(`Can't acquire lease for "${shardId}", someone else did it.`);
return false;
}

async function acquireLeases(instance) {
Expand All @@ -110,9 +113,14 @@ async function acquireLeases(instance) {

const shards = await stream.getShards(privateProps);
const shardIds = Object.keys(shards);
await Promise.all(shardIds.map(shardId => acquireLease(instance, shardId, shards)));
const attempts = await Promise.all(shardIds.map(id => acquireLease(instance, id, shards)));
const changesDetected = attempts.some(Boolean);

// check reading timers vs. owned shards
if (changesDetected) {
logger.debug('At least one shard lease changed.');
} else {
logger.debug('No changes in lease acquisition.');
}

privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL, instance);
}
Expand Down
6 changes: 3 additions & 3 deletions lib/state-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ class StateStore {

async releaseShardLease(shardId, version) {
const privateProps = internal(this);
const { docClient, logger, streamName } = privateProps;
const { client, consumerGroup, logger, streamName } = privateProps;

const releasedVersion = generate();

const params = {
Key: { streamName },
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :w, #a.#b.#d = :x, #a.#b.#e = :y',
ConditionExpression: '#a.#b.#e = :z',
ExpressionAttributeNames: {
Expand All @@ -291,7 +291,7 @@ class StateStore {
};

try {
await docClient.update(params).promise();
await client.update(params);
return releasedVersion;
} catch (err) {
if (err.code !== 'ConditionalCheckFailedException') {
Expand Down
16 changes: 8 additions & 8 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ async function isActive(props) {

if (StreamStatus === 'DELETING') {
logger.debug('Waiting for the stream to complete deletion…');
await client.waitFor('streamNotExists', params).promise();
await client.waitFor('streamNotExists', params);
logger.debug('The stream is now gone.');
return null;
}

if (StreamStatus && StreamStatus !== 'ACTIVE') {
logger.debug('Waiting for the stream to be active…');
({ StreamDescription } = await client.waitFor('streamExists', params).promise());
({ StreamDescription } = await client.waitFor('streamExists', params));
logger.debug('The stream is now active.');
}

Expand All @@ -40,9 +40,9 @@ async function activate(props) {
if (createStreamIfNeeded && streamArn === null) {
logger.debug('Trying to create the stream…');
const params = { StreamName: streamName };
await client.createStream({ ...params, ShardCount: shardCount }).promise();
await client.createStream({ ...params, ShardCount: shardCount });
logger.debug('Waiting for the new stream to be active…');
const { StreamDescription } = await client.waitFor('streamExists', params).promise();
const { StreamDescription } = await client.waitFor('streamExists', params);
logger.debug('The new stream is now active.');
return StreamDescription.StreamARN;
}
Expand All @@ -60,9 +60,9 @@ async function encrypt(props) {

if (StreamDescription.EncryptionType === 'NONE') {
logger.debug('Trying to encrypt the stream…');
await client.startStreamEncryption({ StreamName, EncryptionType, KeyId }).promise();
await client.startStreamEncryption({ StreamName, EncryptionType, KeyId });
logger.debug('Waiting for the stream to update…');
await client.waitFor('streamExists', { StreamName }).promise();
await client.waitFor('streamExists', { StreamName });
logger.debug('The stream is now encrypted.');
} else {
logger.debug('The stream is encrypted.');
Expand All @@ -73,7 +73,7 @@ async function getShards(props) {
const { logger, client, streamName } = props;
logger.debug(`Retrieving shards for the "${streamName}" stream…`);

const params = { StreamName: streamName, MaxResults: 1000 };
const params = { StreamName: streamName };
const { Shards } = await client.listShards(params);

const shards = Shards.reduce((obj, item) => {
Expand Down Expand Up @@ -101,7 +101,7 @@ async function tag(props) {
const mergedTags = { ...existingTags, ...tags };

if (!equal(existingTags, mergedTags)) {
await client.addTagsToStream({ StreamName, Tags: mergedTags }).promise();
await client.addTagsToStream({ StreamName, Tags: mergedTags });
logger.debug(`The stream tags have been updated.`);
} else {
logger.debug('The stream is already tagged as required.');
Expand Down
10 changes: 5 additions & 5 deletions lib/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ async function checkIfTableExists(client, logger, tableName) {
logger.debug(`The "${tableName}" table status is ${TableStatus}.`);
if (TableStatus === 'DELETING') {
logger.debug('Waiting for the table to complete deletion…');
await client.waitFor('tableNotExists', params).promise();
await client.waitFor('tableNotExists', params);
logger.debug('The table is gone.');
return null;
}
if (TableStatus && TableStatus !== 'ACTIVE') {
logger.debug('Waiting for the table to be active…');
await client.waitFor('tableExists', params).promise();
await client.waitFor('tableExists', params);
logger.debug('The table is now active.');
}
return Table.TableArn;
Expand Down Expand Up @@ -55,7 +55,7 @@ async function confirmTableTags(client, logger, tableArn, tags) {
const mergedTags = { ...existingTags, ...tags };
if (!equal(existingTags, mergedTags)) {
Tags = Object.entries(mergedTags).map(([Key, Value]) => ({ Key, Value }));
await client.tagResource({ ...params, Tags }).promise();
await client.tagResource({ ...params, Tags });
logger.debug('The table tags have been updated.');
} else {
logger.debug('The table is already tagged as required.');
Expand All @@ -73,9 +73,9 @@ async function confirmTableTags(client, logger, tableArn, tags) {
async function createTable(client, logger, tableName) {
logger.debug(`Trying to create the "${tableName}" table…`);
const params = { TableName: tableName };
await client.createTable({ ...params, ...stateTableDefinition }).promise();
await client.createTable({ ...params, ...stateTableDefinition });
logger.debug('Waiting for the new table to be active…');
const { Table } = await client.waitFor('tableExists', params).promise();
const { Table } = await client.waitFor('tableExists', params);
logger.debug('The new table is now active.');
return Table.TableArn;
}
Expand Down

0 comments on commit e64136d

Please sign in to comment.