Skip to content

Commit

Permalink
Use describeStreamSummary which doubles the limits of transactions …
Browse files Browse the repository at this point in the history
…per second
  • Loading branch information
pjlevitt committed Apr 14, 2020
1 parent e6717f6 commit 7f2950f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 23 deletions.
3 changes: 3 additions & 0 deletions lib/__mocks__/aws-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const addTagsToStream = createResponseMock();
const createStream = createResponseMock();
const deregisterStreamConsumer = createResponseMock();
const describeStream = createResponseMock();
const describeStreamSummary = createResponseMock();
const getRecords = createResponseMock();
const getShardIterator = createResponseMock();
const isEndpointLocal = createResponseMock();
Expand All @@ -62,6 +63,7 @@ const Kinesis = jest.fn(({ endpoint } = {}) => {
createStream,
deregisterStreamConsumer,
describeStream,
describeStreamSummary,
endpoint: new URL(endpoint || 'https://kinesis.amazonaws.com/'),
getRecords,
getShardIterator,
Expand Down Expand Up @@ -119,6 +121,7 @@ function mockClear() {
createStream.mockClear();
deregisterStreamConsumer.mockClear();
describeStream.mockClear();
describeStreamSummary.mockClear();
getRecords.mockClear();
getShardIterator.mockClear();
isEndpointLocal.mockClear();
Expand Down
11 changes: 11 additions & 0 deletions lib/kinesis-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ class KinesisClient {
return retriableSdkCall(client, 'describeStream', streamName, retryOpts, ...args);
}

/**
* Summarizes the specified Kinesis data stream.
*
* @param {...*} args - The arguments.
* @returns {Promise}
*/
describeStreamSummary(...args) {
const { client, retryOpts, streamName } = internal(this);
return retriableSdkCall(client, 'describeStreamSummary', streamName, retryOpts, ...args);
}

/**
* Gets data records from a Kinesis data stream's shard.
*
Expand Down
2 changes: 2 additions & 0 deletions lib/kinesis-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe('lib/kinesis-client', () => {
'createStream',
'deregisterStreamConsumer',
'describeStream',
'describeStreamSummary',
'getRecords',
'getShardIterator',
'isEndpointLocal',
Expand Down Expand Up @@ -92,6 +93,7 @@ describe('lib/kinesis-client', () => {
${'createStream'} | ${false}
${'deregisterStreamConsumer'} | ${false}
${'describeStream'} | ${true}
${'describeStreamSummary'} | ${true}
${'getRecords'} | ${true}
${'getShardIterator'} | ${true}
${'listShards'} | ${true}
Expand Down
8 changes: 4 additions & 4 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ const wait = promisify(setTimeout);
async function checkIfStreamExists({ client, logger, streamName }) {
try {
const params = { StreamName: streamName };
const { StreamDescription } = await client.describeStream(params);
const { StreamARN, StreamCreationTimestamp, StreamStatus } = StreamDescription;
const { StreamDescriptionSummary } = await client.describeStreamSummary(params);
const { StreamARN, StreamCreationTimestamp, StreamStatus } = StreamDescriptionSummary;

if (StreamStatus === 'DELETING') {
logger.debug('Waiting for the stream to complete deletion…');
Expand Down Expand Up @@ -108,9 +108,9 @@ async function ensureStreamEncription(params) {
const { client, encryption, logger, streamName: StreamName } = params;
const { keyId: KeyId, type: EncryptionType } = encryption;

const { StreamDescription } = await client.describeStream({ StreamName });
const { StreamDescriptionSummary } = await client.describeStreamSummary({ StreamName });

if (StreamDescription.EncryptionType === 'NONE') {
if (StreamDescriptionSummary.EncryptionType === 'NONE') {
logger.debug('Trying to encrypt the stream…');
await client.startStreamEncryption({ EncryptionType, KeyId, StreamName });
logger.debug('Waiting for the stream to update…');
Expand Down
45 changes: 26 additions & 19 deletions lib/stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('lib/stream', () => {

const addTagsToStream = jest.fn();
const createStream = jest.fn();
const describeStream = jest.fn();
const describeStreamSummary = jest.fn();
const listShards = jest.fn();
const listStreamConsumers = jest.fn();
const listTagsForStream = jest.fn();
Expand All @@ -37,7 +37,7 @@ describe('lib/stream', () => {
const client = {
addTagsToStream,
createStream,
describeStream,
describeStreamSummary,
listShards,
listStreamConsumers,
listTagsForStream,
Expand All @@ -60,7 +60,7 @@ describe('lib/stream', () => {
addTagsToStream.mockClear();
createStream.mockClear();
debug.mockClear();
describeStream.mockClear();
describeStreamSummary.mockClear();
errorMock.mockClear();
listShards.mockClear();
listStreamConsumers.mockClear();
Expand All @@ -84,8 +84,8 @@ describe('lib/stream', () => {
});

test('checkIfStreamExists returns the ARN and the creation timestamp of a stream', async () => {
describeStream.mockResolvedValueOnce({
StreamDescription: {
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: {
StreamARN: 'foo',
StreamCreationTimestamp: new Date('2019-01-01'),
StreamStatus: 'ACTIVE'
Expand All @@ -95,26 +95,26 @@ describe('lib/stream', () => {
streamArn: 'foo',
streamCreatedOn: '2019-01-01T00:00:00.000Z'
});
expect(describeStream).toHaveBeenCalledWith({ StreamName: 'test-stream' });
expect(describeStreamSummary).toHaveBeenCalledWith({ StreamName: 'test-stream' });
});

test("checkIfStreamExists returns a null ARN if the stream doesn't exist", async () => {
const error = Object.assign(new Error('foo'), { code: 'ResourceNotFoundException' });
describeStream.mockRejectedValueOnce(error);
describeStreamSummary.mockRejectedValueOnce(error);
await expect(checkIfStreamExists(commonParams)).resolves.toEqual({ streamArn: null });
expect(errorMock).not.toHaveBeenCalled();
});

test('checkIfStreamExists throws errors from the internal calls', async () => {
const error = new Error('foo');
describeStream.mockRejectedValueOnce(error);
describeStreamSummary.mockRejectedValueOnce(error);
await expect(checkIfStreamExists(commonParams)).rejects.toBe(error);
expect(errorMock).toHaveBeenCalledWith(error);
});

test('checkIfStreamExists waits until the stream is deleted before resolving', async () => {
describeStream.mockResolvedValueOnce({
StreamDescription: {
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: {
StreamARN: 'foo',
StreamCreationTimestamp: new Date('2019-01-01'),
StreamStatus: 'DELETING'
Expand All @@ -129,8 +129,8 @@ describe('lib/stream', () => {
});

test('checkIfStreamExists resolves until the stream finishes updating', async () => {
describeStream.mockResolvedValueOnce({
StreamDescription: {
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: {
StreamARN: 'foo',
StreamCreationTimestamp: new Date('2019-01-01'),
StreamStatus: 'UPDATING'
Expand Down Expand Up @@ -169,7 +169,9 @@ describe('lib/stream', () => {
});

test('ensureStreamEncription will encrypt a non-encrypted stream', async () => {
describeStream.mockResolvedValueOnce({ StreamDescription: { EncryptionType: 'NONE' } });
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: { EncryptionType: 'NONE' }
});
await expect(
ensureStreamEncription({ ...commonParams, encryption: { keyId: 'foo', type: 'bar' } })
).resolves.toBeUndefined();
Expand All @@ -186,7 +188,9 @@ describe('lib/stream', () => {
});

test("ensureStreamEncription won't try to encrypt an already encrypted stream", async () => {
describeStream.mockResolvedValueOnce({ StreamDescription: { EncryptionType: 'foo' } });
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: { EncryptionType: 'foo' }
});
await expect(
ensureStreamEncription({ ...commonParams, encryption: { keyId: 'bar', type: 'baz' } })
).resolves.toBeUndefined();
Expand All @@ -195,8 +199,8 @@ describe('lib/stream', () => {
});

test("ensureStreamExists won't try to create a stream if it already exists", async () => {
describeStream.mockResolvedValueOnce({
StreamDescription: {
describeStreamSummary.mockResolvedValueOnce({
StreamDescriptionSummary: {
StreamARN: 'foo',
StreamCreationTimestamp: new Date('2019-01-01'),
StreamStatus: 'ACTIVE'
Expand All @@ -213,7 +217,7 @@ describe('lib/stream', () => {
});

test("ensureStreamExists tries to create a stream if it doesn't exists", async () => {
describeStream.mockRejectedValueOnce(
describeStreamSummary.mockRejectedValueOnce(
Object.assign(new Error('foo'), { code: 'ResourceNotFoundException' })
);
waitFor.mockResolvedValueOnce({
Expand All @@ -234,11 +238,14 @@ describe('lib/stream', () => {
});

test("ensureStreamExists won't create a stream if it doesn't exists when opted out", async () => {
describeStream.mockRejectedValueOnce(
describeStreamSummary.mockRejectedValueOnce(
Object.assign(new Error('foo'), { code: 'ResourceNotFoundException' })
);
waitFor.mockResolvedValueOnce({
StreamDescription: { StreamARN: 'bar', StreamCreationTimestamp: new Date('2019-01-01') }
StreamDescriptionSummary: {
StreamARN: 'bar',
StreamCreationTimestamp: new Date('2019-01-01')
}
});
await expect(
ensureStreamExists({ ...commonParams, createStreamIfNeeded: false })
Expand Down

0 comments on commit 7f2950f

Please sign in to comment.