Skip to content

Commit

Permalink
Add stats support
Browse files Browse the repository at this point in the history
  • Loading branch information
Mackenzie Turner authored and Mackenzie Turner committed Apr 26, 2019
1 parent 7505973 commit 328cd80
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 115 deletions.
50 changes: 43 additions & 7 deletions lib/dynamodb-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const retry = require('async-retry');
const { DynamoDB } = require('aws-sdk');
const { reportException, reportSuccess: reportSuccesStats } = require('./stats');

const privateData = new WeakMap();

Expand All @@ -21,6 +22,11 @@ function shouldBailRetry(err) {
);
}

const reportSuccess = result => {
reportSuccesStats('dynamoDb');
return result;
};

class DynamoDbClient {
constructor({ awsOptions, logger, tableName }) {
const client = new DynamoDB(awsOptions);
Expand Down Expand Up @@ -51,10 +57,12 @@ class DynamoDbClient {
return internal(this)
.client.createTable(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportException(err, 'dynamoDb');
throw error;
});
}
Expand All @@ -66,10 +74,14 @@ class DynamoDbClient {
client
.describeTable(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code } = err;
if (code === 'ResourceNotFoundException' || shouldBailRetry(err)) bail();
else throw err;
if (code === 'ResourceNotFoundException' || shouldBailRetry(err)) bail(err);
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -87,9 +99,13 @@ class DynamoDbClient {
docClient
.get(...args)
.promise()
.then(reportSuccess)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else throw err;
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -107,9 +123,13 @@ class DynamoDbClient {
client
.listTagsOfResource(...args)
.promise()
.then(reportSuccess)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else throw err;
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -127,10 +147,14 @@ class DynamoDbClient {
docClient
.put(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code } = err;
if (code === 'ConditionalCheckFailedException' || shouldBailRetry(err)) bail(err);
else throw err;
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -145,10 +169,12 @@ class DynamoDbClient {
return internal(this)
.client.tagResource(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportException(err, 'dynamoDb');
throw error;
});
}
Expand All @@ -160,10 +186,14 @@ class DynamoDbClient {
docClient
.update(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code } = err;
if (code === 'ConditionalCheckFailedException' || shouldBailRetry(err)) bail(err);
else throw err;
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -181,9 +211,13 @@ class DynamoDbClient {
client
.waitFor(...args)
.promise()
.then(reportSuccess)
.catch(err => {
if (shouldBailRetry(err)) bail(err);
else throw err;
else {
reportException(err, 'dynamoDb');
throw err;
}
}),
retryOpts
).catch(err => {
Expand All @@ -198,10 +232,12 @@ class DynamoDbClient {
return internal(this)
.docClient.delete(...args)
.promise()
.then(reportSuccess)
.catch(err => {
const { code, message } = err;
const error = new Error(message);
error.code = code;
reportException(err, 'dynamoDb');
throw error;
});
}
Expand Down
3 changes: 3 additions & 0 deletions lib/fan-out-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { promisify } = require('util');
const { RecordsDecoder } = require('./records');
const { getStreamShards } = require('./stream');
const { safeJsonParse, wait } = require('./utils');
const { reportException, reportSuccess } = require('./stats');

const AWS_API_TARGET = 'Kinesis_20131202.SubscribeToShard';
const AWS_EVENT_STREAM = 'application/vnd.amazon.eventstream';
Expand Down Expand Up @@ -155,9 +156,11 @@ class FanOutConsumer {
if (headers['content-type'] !== AWS_EVENT_STREAM || statusCode !== 200) {
logger.error(`Subscription unsuccessful: ${statusCode}`);
requestFlags.isEventStream = false;
reportException({ statusCode }, 'kinesis', streamName);
} else {
logger.debug('Subscription to shard is successful.');
requestFlags.isEventStream = true;
reportSuccess('kinesis', streamName);
}
};

Expand Down
25 changes: 21 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const StateStore = require('./state-store');
const { getRecordsEncoder } = require('./records');
const { name: moduleName } = require('../package.json');
const { noop } = require('./utils');
const { getStats, reportRecordConsumed, startStatsEmitter } = require('./stats');

const MAX_ENHANCED_CONSUMERS = 5;

Expand All @@ -38,6 +39,8 @@ function internal(instance) {

async function ensureStreamInitialized(instance) {
const privateProps = internal(instance);
startStatsEmitter(instance, privateProps);

const { encryption, streamArn, streamCreatedOn, tags } = privateProps;
if (streamArn && streamCreatedOn) return;

Expand Down Expand Up @@ -170,6 +173,7 @@ class Kinesis extends PassThrough {
noRecordsPollDelay = 1000,
pollDelay = 250,
shardCount,
statsInterval,
streamName,
tags,
useAutoCheckpoints = true,
Expand Down Expand Up @@ -197,7 +201,7 @@ class Kinesis extends PassThrough {

Object.assign(internal(this), {
awsOptions,
client: new KinesisClient({ awsOptions, logger: normLogger }),
client: new KinesisClient({ awsOptions, logger: normLogger, streamName }),
compression,
consumerGroup,
consumerId: generate(),
Expand All @@ -209,6 +213,7 @@ class Kinesis extends PassThrough {
pollDelay: normPollDelay >= 0 ? normPollDelay : 250,
recordsEncoder: getRecordsEncoder(compression, 'Buffer'),
shardCount: normShardCount >= 1 ? normShardCount : 1,
statsInterval,
streamName,
tags,
useAutoCheckpoints: Boolean(useAutoCheckpoints),
Expand All @@ -229,7 +234,7 @@ class Kinesis extends PassThrough {
*/
async startConsumer() {
const privateProps = internal(this);
const { logger, useEnhancedFanOut } = privateProps;
const { logger, streamName, useEnhancedFanOut } = privateProps;

await ensureStreamInitialized(this);

Expand All @@ -247,7 +252,10 @@ class Kinesis extends PassThrough {

privateProps.pushToStream = (err, ...args) => {
if (err) this.emit('error', err);
else this.push(...args);
else {
this.push(...args);
reportRecordConsumed(streamName);
}
};

const consumersManager = new ConsumersManager(privateProps);
Expand Down Expand Up @@ -277,7 +285,7 @@ class Kinesis extends PassThrough {
const privateProps = internal(this);
const { client, recordsEncoder } = privateProps;
await ensureStreamInitialized(this);
return client.putRecord({
await client.putRecord({
...(await recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
});
Expand Down Expand Up @@ -306,6 +314,15 @@ class Kinesis extends PassThrough {
StreamName: streamName || privateProps.streamName
});
}

getStats() {
const { streamName } = internal(this);
return getStats(streamName);
}

static getStats() {
return getStats();
}
}

/**
Expand Down
Loading

0 comments on commit 328cd80

Please sign in to comment.