Skip to content

Commit

Permalink
Upgrade dependencies and fix new linter exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Oct 18, 2019
1 parent 8507c22 commit 750240b
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 370 deletions.
28 changes: 15 additions & 13 deletions lib/dynamodb-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ describe('lib/dynamodb-client', () => {
sdkClient = isDocClient ? new DynamoDB.DocumentClient() : new DynamoDB();
}

function throwErrorImplementation() {
throw error;
}

function rejectedPromiseImplementation() {
return { promise: () => Promise.reject(error) };
}

afterEach(() => {
warn.mockClear();
reportError.mockClear();
Expand Down Expand Up @@ -89,27 +81,35 @@ describe('lib/dynamodb-client', () => {

test(`${methodName} throws exceptions from the wrapped SDK call`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(throwErrorImplementation);
sdkClient[methodName].mockImplementationOnce(() => {
throw error;
});
await expect(client[methodName]({})).rejects.toThrow(error);
expect(reportError).toHaveBeenCalledWith('dynamoDb', error);
expect(warn).not.toHaveBeenCalled();
});

test(`${methodName} throws exceptions from the wrapped SDK promise`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
await expect(client[methodName]({})).rejects.toThrow(error);
expect(reportError).toHaveBeenCalledWith('dynamoDb', error);
expect(warn).not.toHaveBeenCalled();
});

test(`${methodName} throws exceptions with a debuggable stack trace`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
const stackBefore = await client[methodName]({}).catch(err => err.stack);

recreateClients(isDocClient);
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
getStackObj.mockReturnValueOnce({ stack: '\n' });
const stackAfter = await client[methodName]({}).catch(err => err.stack);

Expand All @@ -119,7 +119,9 @@ describe('lib/dynamodb-client', () => {
if (isRetriable) {
test(`${methodName} retries errors from the wrapped SDK`, async () => {
error = new Error('foo');
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
const promise = client[methodName]({});
await expect(promise).resolves.toEqual({});
expect(sdkClient[methodName]).toHaveBeenCalledTimes(2);
Expand Down
8 changes: 4 additions & 4 deletions lib/fan-out-consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ jest.mock('./stats');

jest.useFakeTimers();

function nextTickWait() {
return new Promise(resolve => setImmediate(resolve));
}

describe('lib/fan-out-consumer', () => {
const debug = jest.fn();
const error = jest.fn();
Expand Down Expand Up @@ -74,10 +78,6 @@ describe('lib/fan-out-consumer', () => {
streamName: 'test-stream'
};

function nextTickWait() {
return new Promise(resolve => setImmediate(resolve));
}

afterEach(() => {
aws.mockClear();
debug.mockClear();
Expand Down
8 changes: 4 additions & 4 deletions lib/heartbeat-manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

const HeartbeatManager = require('./heartbeat-manager');

function nextTickWait() {
return new Promise(resolve => setImmediate(resolve));
}

describe('lib/heartbeat-manager', () => {
const debug = jest.fn();
const error = jest.fn();
Expand All @@ -10,10 +14,6 @@ describe('lib/heartbeat-manager', () => {
const registerConsumer = jest.fn();
const stateStore = { clearOldConsumers, registerConsumer };

function nextTickWait() {
return new Promise(resolve => setImmediate(resolve));
}

beforeEach(() => {
jest.useFakeTimers();
});
Expand Down
65 changes: 34 additions & 31 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,24 @@ async function setUpEnhancedConsumers(instance) {
);
}

function parsePutRecordResult({ EncryptionType, SequenceNumber, ShardId }) {
return {
encryptionType: EncryptionType,
sequenceNumber: SequenceNumber,
shardId: ShardId
};
}

function parsePutRecordsResult({ EncryptionType, Records }) {
return {
encryptionType: EncryptionType,
records: Records.map(({ SequenceNumber, ShardId }) => ({
sequenceNumber: SequenceNumber,
shardId: ShardId
}))
};
}

/**
* A [pass-through stream]{@link external:PassThrough} class specialization implementing a consumer
* of Kinesis Data Streams using the [AWS SDK for JavaScript]{@link external:AwsJsSdk}. Incoming
Expand Down Expand Up @@ -419,21 +437,16 @@ class Kinesis extends PassThrough {
...(await privateProps.recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, SequenceNumber, ShardId }) => ({
encryptionType: EncryptionType,
sequenceNumber: SequenceNumber,
shardId: ShardId
});
try {
return parseResult(await client.putRecord(awsParams));
return parsePutRecordResult(await client.putRecord(awsParams));
} catch (err) {
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return parseResult(await client.putRecord(awsParams));
return parsePutRecordResult(await client.putRecord(awsParams));
}
throw err;
}
Expand All @@ -458,21 +471,18 @@ class Kinesis extends PassThrough {
StreamName: streamName || privateProps.streamName
};

const parseResult = ({ Shards }) =>
Shards.map(
({
HashKeyRange: { EndingHashKey, StartingHashKey },
SequenceNumberRange: { StartingSequenceNumber },
ShardId
}) => ({
hashKeyRange: { endingHashKey: EndingHashKey, startingHashKey: StartingHashKey },
sequenceNumberRange: { startingSequenceNumber: StartingSequenceNumber },
shardId: ShardId
})
);

const data = await client.listShards(awsParams);
return parseResult(data);
const { Shards } = await client.listShards(awsParams);
return Shards.map(
({
HashKeyRange: { EndingHashKey, StartingHashKey },
SequenceNumberRange: { StartingSequenceNumber },
ShardId
}) => ({
hashKeyRange: { endingHashKey: EndingHashKey, startingHashKey: StartingHashKey },
sequenceNumberRange: { startingSequenceNumber: StartingSequenceNumber },
shardId: ShardId
})
);
}

/**
Expand Down Expand Up @@ -506,23 +516,16 @@ class Kinesis extends PassThrough {
Records: await Promise.all(records.map(privateProps.recordsEncoder)),
StreamName: streamName || privateProps.streamName
};
const parseResult = ({ EncryptionType, Records }) => ({
encryptionType: EncryptionType,
records: Records.map(({ SequenceNumber, ShardId }) => ({
sequenceNumber: SequenceNumber,
shardId: ShardId
}))
});
try {
return parseResult(await client.putRecords(awsParams));
return parsePutRecordsResult(await client.putRecords(awsParams));
} catch (err) {
const { code } = err;
const streamDoesNotExist =
code === 'ResourceNotFoundException' ||
(code === 'UnknownError' && client.isEndpointLocal());
if (createStreamIfNeeded && streamDoesNotExist) {
await ensureStreamInitialized(this, streamName);
return parseResult(await client.putRecords(awsParams));
return parsePutRecordsResult(await client.putRecords(awsParams));
}
throw err;
}
Expand Down
77 changes: 43 additions & 34 deletions lib/kinesis-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ jest.mock('./utils', () => {
return { ...utils, getStackObj: jest.fn((...args) => utils.getStackObj(...args)) };
});

let mockedSeqNum = 0;

function putTwoRecords(params) {
return {
promise: () => {
let FailedRecordCount = 0;
const Records = [];
for (let i = 0; i < params.Records.length; i += 1) {
if (i > 1) {
FailedRecordCount += 1;
Records.push({ ErrorCode: 'ProvisionedThroughputExceededException' });
} else {
Records.push({ SequenceNumber: mockedSeqNum.toString() });
mockedSeqNum += 1;
}
}
return Promise.resolve({ EncryptionType: 'foo', FailedRecordCount, Records });
}
};
}

describe('lib/kinesis-client', () => {
const warn = jest.fn();
const logger = { warn };
Expand All @@ -25,20 +46,13 @@ describe('lib/kinesis-client', () => {
sdkClient = new Kinesis();
}

function throwErrorImplementation() {
throw error;
}

function rejectedPromiseImplementation() {
return { promise: () => Promise.reject(error) };
}

afterEach(() => {
mockClear();
reportError.mockClear();
reportResponse.mockClear();
warn.mockClear();
reportRecordSent.mockClear();
mockedSeqNum = 0;
});

test('the module exports the expected', () => {
Expand Down Expand Up @@ -99,29 +113,37 @@ describe('lib/kinesis-client', () => {

test(`${methodName} throws exceptions from the wrapped SDK call`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(throwErrorImplementation);
sdkClient[methodName].mockImplementationOnce(() => {
throw error;
});
await expect(client[methodName]({})).rejects.toThrow(error);
expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream');
expect(warn).not.toHaveBeenCalled();
});

test(`${methodName} throws exceptions from the wrapped SDK promise`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
await expect(client[methodName]({})).rejects.toThrow(error);
expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream');
expect(warn).not.toHaveBeenCalled();
});

test(`${methodName} throws exceptions with a debuggable stack trace`, async () => {
error = Object.assign(new Error('foo'), { code: 'MissingRequiredParameter' });
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
const stackBefore = await client[methodName]({}).catch(err => err.stack);

client = new KinesisClient({ logger, streamName: 'test-stream' });
sdkClient = new Kinesis();

sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
getStackObj.mockReturnValueOnce({ stack: '\n' });
const stackAfter = await client[methodName]({}).catch(err => err.stack);

Expand All @@ -131,7 +153,9 @@ describe('lib/kinesis-client', () => {
if (isRetriable) {
test(`${methodName} retries errors from the wrapped SDK`, async () => {
error = Object.assign(new Error('foo'), { code: 'ProvisionedThroughputExceededException' });
sdkClient[methodName].mockImplementationOnce(rejectedPromiseImplementation);
sdkClient[methodName].mockImplementationOnce(() => ({
promise: () => Promise.reject(error)
}));
const promise = client[methodName]({});
await expect(promise).resolves.toEqual({});
expect(sdkClient[methodName]).toHaveBeenCalledTimes(2);
Expand All @@ -143,7 +167,9 @@ describe('lib/kinesis-client', () => {
if (methodName === 'createStream' || methodName === 'startStreamEncryption') {
test(`${methodName} should succeed if the stream is already getting updated`, async () => {
error = Object.assign(new Error('foo'), { code: 'ResourceInUseException' });
sdkClient[methodName].mockImplementationOnce(throwErrorImplementation);
sdkClient[methodName].mockImplementationOnce(() => {
throw error;
});
await expect(client[methodName]({})).resolves.toBeUndefined();
expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream');
expect(warn).not.toHaveBeenCalled();
Expand All @@ -153,7 +179,9 @@ describe('lib/kinesis-client', () => {
if (methodName === 'startStreamEncryption') {
test(`${methodName} should succeed if the operation is not supported`, async () => {
error = Object.assign(new Error('foo'), { code: 'UnknownOperationException' });
sdkClient[methodName].mockImplementationOnce(throwErrorImplementation);
sdkClient[methodName].mockImplementationOnce(() => {
throw error;
});
await expect(client[methodName]({})).resolves.toBeUndefined();
expect(reportError).toHaveBeenCalledWith('kinesis', error, 'test-stream');
expect(warn).not.toHaveBeenCalled();
Expand All @@ -162,25 +190,6 @@ describe('lib/kinesis-client', () => {

if (methodName === 'putRecords') {
test('putRecords should retry failed records until it succeeds', async () => {
let mockedSeqNum = 0;
function putTwoRecords(params) {
return {
promise: () => {
let FailedRecordCount = 0;
const Records = [];
for (let i = 0; i < params.Records.length; i += 1) {
if (i > 1) {
FailedRecordCount += 1;
Records.push({ ErrorCode: 'ProvisionedThroughputExceededException' });
} else {
Records.push({ SequenceNumber: mockedSeqNum.toString() });
mockedSeqNum += 1;
}
}
return Promise.resolve({ EncryptionType: 'foo', FailedRecordCount, Records });
}
};
}
sdkClient.putRecords.mockImplementationOnce(putTwoRecords);
sdkClient.putRecords.mockImplementationOnce(putTwoRecords);
await expect(
Expand Down
Loading

0 comments on commit 750240b

Please sign in to comment.