Skip to content

Commit

Permalink
Allow reading from all shards
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Apr 22, 2019
1 parent ddb50fe commit 26675f8
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 137 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ Puts a record to a stream.
**Fulfil**: If record is successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.

| Param | Type | Description |
| -------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecord options. In addition to the params described here, uses [`AWS.Kinesis.putRecord` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property) from the AwsJsSdk putRecord method in camelCase. |
| options.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| -------------------- | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecord options. In addition to the params described here, uses [`AWS.Kinesis.putRecord` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecord-property) from the `AWS.Kinesis.putRecord` method in camel case. |
| options.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |

<a name="module_lifion-kinesis--Kinesis+putRecords"></a>

Expand All @@ -141,12 +141,12 @@ Batch puts multiple records to a stream.
**Fulfil**: If records are successfully pushed to stream.
**Reject**: <code>Error</code> - On any unexpected error while pushing to stream.

| Param | Type | Description |
| -------------------- | ------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecords options. In addition to the params described here, uses [`AWS.Kinesis.putRecords` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property) from the AwsJsSdk putRecords method in camelCase. |
| options.records | <code>Array</code> | A list of records to push to a Kinesis stream. |
| options.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |
| Param | Type | Description |
| -------------------- | ------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| options | <code>Object</code> | The putRecords options. In addition to the params described here, uses [`AWS.Kinesis.putRecords` options](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property) from the `AWS.Kinesis.putRecords` method in camel case. |
| options.records | <code>Array</code> | A list of records to push to a Kinesis stream. |
| options.records.data | <code>Object</code> \| <code>string</code> | The data to be used as the Kinesis message. |
| [options.streamName] | <code>string</code> | If provided, overrides the stream name provided on client instantiation. |

## License

Expand Down
3 changes: 3 additions & 0 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ConsumersManager {
const runningConsumer = consumers[shardId];
const shard = ownedShards[shardId];
if (!runningConsumer) {
logger.debug(`Starting polling consumer for "${shardId}"…`);
const consumer = new PollingConsumer({
client,
compression,
Expand All @@ -95,6 +96,7 @@ class ConsumersManager {
consumers[shardId] = consumer;
consumer.start();
} else {
logger.debug(`Updating the lease expiration for "${shardId}"…`);
runningConsumer.updateLeaseExpiration(shard.leaseExpiration);
}
});
Expand All @@ -103,6 +105,7 @@ class ConsumersManager {
Object.keys(consumers)
.filter(shardId => !ownedShards[shardId])
.forEach(shardId => {
logger.debug(`Stopping the polling consumer for "${shardId}"…`);
const consumer = consumers[shardId];
if (consumer) consumer.stop();
consumers[shardId] = undefined;
Expand Down
25 changes: 12 additions & 13 deletions lib/heartbeat-manager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const HEARTBEAT_INTERVAL = 30 * 1000;
const HEARTBEAT_INTERVAL = 10 * 1000;
const HEARTBEAT_FAILURE_TIMEOUT = HEARTBEAT_INTERVAL * 3;

const privateData = new WeakMap();
Expand All @@ -10,24 +10,23 @@ function internal(instance) {
return privateData.get(instance);
}

async function heartbeat(instance) {
const privateProps = internal(instance);
const { logger, stateStore } = privateProps;

await stateStore.clearOldConsumers(HEARTBEAT_FAILURE_TIMEOUT);
await stateStore.registerConsumer();
logger.debug('Heartbeat sent.');

privateProps.timeoutId = setTimeout(heartbeat, HEARTBEAT_INTERVAL, instance);
}

class HeartbeatManager {
constructor({ consumerId, logger, stateStore }) {
Object.assign(internal(this), { consumerId, logger, stateStore });
}

async start() {
await heartbeat(this);
const privateProps = internal(this);
const { logger, stateStore } = privateProps;

const heartbeat = async () => {
await stateStore.clearOldConsumers(HEARTBEAT_FAILURE_TIMEOUT);
await stateStore.registerConsumer();
logger.debug('Heartbeat sent.');
privateProps.timeoutId = setTimeout(heartbeat, HEARTBEAT_INTERVAL);
};

await heartbeat();
}

stop() {
Expand Down
16 changes: 8 additions & 8 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ class Kinesis extends PassThrough {
/**
* Puts a record to a stream.
*
* @param {Object} options - The putRecord options. In addition to the params described
* here, uses [`AWS.Kinesis.putRecord` options]{@link external:AwsJsSdkKinesisPutRecord}
* from the AwsJsSdk putRecord method in camelCase.
* @param {Object} options - The putRecord options. In addition to the params described here,
* uses [`AWS.Kinesis.putRecord` options]{@link external:AwsJsSdkKinesisPutRecord}
* from the `AWS.Kinesis.putRecord` method in camel case.
* @param {(Object|string)} options.data - The data to be used as the Kinesis message.
* @param {string} [options.streamName] - If provided, overrides the stream name provided
* on client instantiation.
* @param {string} [options.streamName] - If provided, overrides the stream name provided on
* client instantiation.
* @fulfil If record is successfully pushed to stream.
* @reject {Error} - On any unexpected error while pushing to stream.
* @returns {Promise}
Expand All @@ -251,9 +251,9 @@ class Kinesis extends PassThrough {
/**
* Batch puts multiple records to a stream.
*
* @param {Object} options - The putRecords options. In addition to the params described
* here, uses [`AWS.Kinesis.putRecords` options]{@link external:AwsJsSdkKinesisPutRecords}
* from the AwsJsSdk putRecords method in camelCase.
* @param {Object} options - The putRecords options. In addition to the params described here,
* uses [`AWS.Kinesis.putRecords` options]{@link external:AwsJsSdkKinesisPutRecords}
* from the `AWS.Kinesis.putRecords` method in camel case.
* @param {Array} options.records - A list of records to push to a Kinesis stream.
* @param {(Object|string)} options.records.data - The data to be used as the Kinesis message.
* @param {string} [options.streamName] - If provided, overrides the stream name provided
Expand Down
93 changes: 51 additions & 42 deletions lib/lease-manager.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict';

const ACQUIRE_LEASES_INTERVAL = 30 * 1000;
const LEASE_TERM_TIMEOUT = 5 * 60 * 1000;
const LEASE_RENEWAL_OFFSET = Math.round(LEASE_TERM_TIMEOUT * 0.25);
const ACQUIRE_LEASES_INTERVAL = 10 * 1000;
const LEASE_TERM_TIMEOUT = 60 * 1000;
const LEASE_RENEWAL_OFFSET = ACQUIRE_LEASES_INTERVAL * 3;

const { checkIfStreamExists, getStreamShards } = require('./stream');

Expand All @@ -22,7 +22,7 @@ function stopManager(instance) {

async function acquireLease(instance, shardId, shardsDescription) {
const privateProps = internal(instance);
const { consumerId, logger, stateStore } = privateProps;
const { consumerId, logger, isStandalone, stateStore } = privateProps;

// Retrieve the state of the shard and the stream.
const shardDescription = shardsDescription[shardId];
Expand Down Expand Up @@ -87,12 +87,14 @@ async function acquireLease(instance, shardId, shardsDescription) {
}

// Check if leasing one more shard won't go over the maximum of allowed active leases.
const shardsCount = Object.keys(shards).length;
const consumersCount = Object.keys(consumers).length;
const maxActiveLeases = Math.ceil(shardsCount / consumersCount);
if (ownLeasesCount + 1 > maxActiveLeases) {
logger.debug(`Maximum of ${maxActiveLeases} active leases reached, cannot lease "${shardId}".`);
return true;
if (!isStandalone) {
const shardsCount = Object.keys(shards).length;
const consumersCount = Object.values(consumers).filter(i => !i.isStandalone).length;
const maxActiveLeases = Math.ceil(shardsCount / consumersCount);
if (ownLeasesCount + 1 > maxActiveLeases) {
logger.debug(`Max. of ${maxActiveLeases} active leases reached, can't lease "${shardId}".`);
return true;
}
}

// Try to lock the shard lease.
Expand All @@ -105,52 +107,59 @@ async function acquireLease(instance, shardId, shardsDescription) {
return false;
}

async function acquireLeases(instance) {
const privateProps = internal(instance);
const { logger } = privateProps;

logger.debug('Trying to acquire leases…');
const { streamArn } = await checkIfStreamExists(privateProps);

if (streamArn === null) {
logger.debug("Can't acquire leases as the stream is gone.");
stopManager(instance);
return;
}

const shards = await getStreamShards(privateProps);
const shardIds = Object.keys(shards).sort();
const attempts = await shardIds.reduce(async (result, id) => {
return (await result).concat(await acquireLease(instance, id, shards));
}, []);
const changesDetected = attempts.some(Boolean);

if (changesDetected) {
logger.debug('At least one shard lease changed.');
privateProps.consumersManager.reconcile({ changesDetected });
} else {
logger.debug('No changes in lease acquisition.');
}

privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL, instance);
}

class LeaseManager {
constructor(options) {
const { client, consumerId, consumersManager, logger, stateStore, streamName } = options;
const {
client,
consumerId,
consumersManager,
logger,
stateStore,
streamName,
useAutoShardAssignment
} = options;

Object.assign(internal(this), {
client,
consumerId,
consumersManager,
isStandalone: !useAutoShardAssignment,
logger,
stateStore,
streamName
});
}

async start() {
await acquireLeases(this);
const privateProps = internal(this);
const { consumersManager, logger } = privateProps;

const acquireLeases = async () => {
logger.debug('Trying to acquire leases…');
const { streamArn } = await checkIfStreamExists(privateProps);

if (streamArn === null) {
logger.debug("Can't acquire leases as the stream is gone.");
stopManager(this);
return;
}

const shards = await getStreamShards(privateProps);
const changesDetected = (await Object.keys(shards).reduce(async (result, id) => {
return (await result).concat(await acquireLease(this, id, shards));
}, [])).some(Boolean);

if (changesDetected) {
logger.debug('At least one shard lease changed.');
consumersManager.reconcile();
} else {
logger.debug('No changes in lease acquisition.');
}

privateProps.timeoutId = setTimeout(acquireLeases, ACQUIRE_LEASES_INTERVAL);
};

await acquireLeases();
}

stop() {
Expand Down
4 changes: 3 additions & 1 deletion lib/polling-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ async function pollForRecords(instance) {
return;
}

const noMsgsDelay = millisBehindLatest <= 0 ? noRecordsPollDelay : 250;
const noMsgsDelay = millisBehindLatest <= 0 ? noRecordsPollDelay : 0;
if (noMsgsDelay === 0)
logger.debug(`Fast-forwarding "${shardId}"… (${millisBehindLatest}ms behind)`);
privateProps.timeoutId = setTimeout(pollForRecords, noMsgsDelay, instance);
return;
}
Expand Down
Loading

0 comments on commit 26675f8

Please sign in to comment.