-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
48 additions
and
314 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,121 +1,10 @@ | ||
'use strict'; | ||
|
||
const { Kinesis: AwsKinesis } = require('aws-sdk'); | ||
const Kinesis = require('.'); | ||
// const consumer = require('./consumer'); | ||
const stream = require('./stream'); | ||
const utils = require('./utils'); | ||
|
||
jest.mock('./consumer'); | ||
jest.mock('./stream'); | ||
jest.mock('./utils'); | ||
jest.mock('./shard-subscriber'); | ||
|
||
describe('lib/index', () => { | ||
const consumerName = 'foo'; | ||
const streamName = 'bar'; | ||
|
||
afterEach(() => { | ||
AwsKinesis.mockClear(); | ||
// consumer.activate.mockClear(); | ||
stream.activate.mockClear(); | ||
stream.encrypt.mockClear(); | ||
stream.getShards.mockClear(); | ||
stream.tag.mockClear(); | ||
utils.noop.mockClear(); | ||
}); | ||
|
||
test('the module exports the expected', () => { | ||
expect(Kinesis).toEqual(expect.any(Function)); | ||
expect(Kinesis).toThrow("Class constructor Kinesis cannot be invoked without 'new'"); | ||
}); | ||
|
||
test('the constructor should throw if called without a "consumerName"', () => { | ||
expect(() => new Kinesis()).toThrow('The "consumerName" option is required.'); | ||
}); | ||
|
||
test('the constructor should throw if called without a "streamName"', () => { | ||
expect(() => new Kinesis({ consumerName })).toThrow('The "streamName" option is required.'); | ||
}); | ||
|
||
test('the constructor should be able to initialize an instance', () => { | ||
let instance; | ||
expect(() => { | ||
instance = new Kinesis({ consumerName, streamName }); | ||
}).not.toThrow(); | ||
expect(instance).toEqual(expect.any(Kinesis)); | ||
}); | ||
|
||
test('connect should return a promise', async () => { | ||
const kinesis = new Kinesis({ consumerName, streamName }); | ||
const promise = kinesis.connect(); | ||
expect(promise).toEqual(expect.any(Promise)); | ||
await expect(promise).resolves.not.toBeDefined(); | ||
}); | ||
|
||
test('connect should instantiate an AWS SDK Kinesis object', async () => { | ||
const kinesis = new Kinesis({ consumerName, streamName, foo: 'bar' }); | ||
await kinesis.connect(); | ||
expect(AwsKinesis).toHaveBeenCalledWith({ foo: 'bar' }); | ||
}); | ||
|
||
test('connect should activate a stream', async () => { | ||
const kinesis = new Kinesis({ consumerName, streamName }); | ||
await kinesis.connect(); | ||
expect(stream.activate).toHaveBeenCalledWith({ | ||
client: expect.any(Object), | ||
compression: undefined, | ||
consumerArn: undefined, | ||
consumerName, | ||
createStreamIfNeeded: true, | ||
encryption: undefined, | ||
logger: expect.any(Object), | ||
options: {}, | ||
shardCount: 1, | ||
shards: [], | ||
streamArn: undefined, | ||
streamName: 'bar', | ||
tags: undefined | ||
}); | ||
}); | ||
|
||
test('connect should use a noop logger when not provided one', async () => { | ||
const kinesis = new Kinesis({ consumerName, streamName }); | ||
await kinesis.connect(); | ||
const [[{ logger }]] = stream.activate.mock.calls; | ||
const { noop } = utils; | ||
expect(logger).toEqual({ debug: noop, error: noop, warn: noop }); | ||
expect(noop.mock.calls).toEqual([ | ||
['Trying to connect the client…'], | ||
['Creating subscribers for the stream shards using "foo"…'], | ||
['The client is now connected.'] | ||
]); | ||
}); | ||
|
||
test('connect should use the provided logger', async () => { | ||
const logger = { debug: jest.fn(), error: jest.fn(), warn: jest.fn() }; | ||
await new Kinesis({ consumerName, streamName, logger }).connect(); | ||
expect(utils.noop).not.toHaveBeenCalled(); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Trying to connect the client…'], | ||
['Creating subscribers for the stream shards using "foo"…'], | ||
['The client is now connected.'] | ||
]); | ||
expect(() => new Kinesis({ logger })).toThrow(); | ||
expect(logger.error.mock.calls[0][0]).toEqual('The "consumerName" option is required.'); | ||
}); | ||
|
||
test('connect should encrypt a stream if provided with the encryption options', async () => { | ||
await new Kinesis({ consumerName, streamName, encryption: { foo: 'bar' } }).connect(); | ||
expect(stream.encrypt).toHaveBeenCalled(); | ||
const [[{ encryption }]] = stream.encrypt.mock.calls; | ||
expect(encryption).toEqual({ foo: 'bar' }); | ||
}); | ||
|
||
test('connect should tag a stream if provided with the tags option', async () => { | ||
await new Kinesis({ consumerName, streamName, tags: { foo: 'bar' } }).connect(); | ||
expect(stream.tag).toHaveBeenCalled(); | ||
const [[{ tags }]] = stream.tag.mock.calls; | ||
expect(tags).toEqual({ foo: 'bar' }); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,183 +1,15 @@ | ||
'use strict'; | ||
|
||
const { Kinesis, mockClear, mockStreams } = require('aws-sdk'); | ||
const stream = require('./stream'); | ||
|
||
describe('lib/stream', () => { | ||
let client; | ||
let logger; | ||
let ctx; | ||
|
||
beforeEach(() => { | ||
client = new Kinesis(); | ||
logger = { debug: jest.fn(), error: jest.fn() }; | ||
ctx = { | ||
createStreamIfNeeded: true, | ||
client, | ||
logger, | ||
shardCount: 1, | ||
streamName: 'foo' | ||
}; | ||
}); | ||
|
||
afterEach(() => { | ||
mockClear(); | ||
}); | ||
|
||
test('the module exports the expected', () => { | ||
expect(stream).toEqual({ | ||
activate: expect.any(Function), | ||
encrypt: expect.any(Function), | ||
getShards: expect.any(Function), | ||
tag: expect.any(Function) | ||
}); | ||
}); | ||
|
||
test("activate creates a stream if it's doesn't exists and auto-create is on", async () => { | ||
await expect(stream.activate(ctx)).resolves.toMatch(/^arn:aws:kinesis/); | ||
expect(client.createStream).toHaveBeenCalledWith({ ShardCount: 1, StreamName: 'foo' }); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" exists…'], | ||
['Trying to create the stream…'], | ||
['Waiting for the new stream to be active…'], | ||
['The new stream is now active.'] | ||
]); | ||
}); | ||
|
||
test("activate throws if a stream doesn't exists and auto-create is off", async () => { | ||
ctx.createStreamIfNeeded = false; | ||
await expect(stream.activate(ctx)).rejects.toThrow("The stream doesn't exists."); | ||
expect(client.createStream).not.toHaveBeenCalledWith(); | ||
expect(logger.debug.mock.calls).toEqual([['Checking if the stream "foo" exists…']]); | ||
const [[{ code, message }]] = logger.error.mock.calls; | ||
expect(code).toBe('ResourceNotFoundException'); | ||
expect(message).toBe("The stream doesn't exists."); | ||
}); | ||
|
||
test("activate won't try to create a stream if it exists already", async () => { | ||
const mockStream = { StreamARN: 'bar', StreamName: 'foo', StreamStatus: 'ACTIVE' }; | ||
mockStreams().push(mockStream); | ||
await expect(stream.activate(ctx)).resolves.toBe('bar'); | ||
expect(client.createStream).not.toHaveBeenCalledWith(); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" exists…'], | ||
['The stream status is ACTIVE.'] | ||
]); | ||
}); | ||
|
||
test("activate waits for a stream if it's in creating state", async () => { | ||
const mockStream = { StreamARN: 'bar', StreamName: 'foo', StreamStatus: 'CREATING' }; | ||
mockStreams().push(mockStream); | ||
await expect(stream.activate(ctx)).resolves.toMatch('bar'); | ||
expect(client.createStream).not.toHaveBeenCalled(); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" exists…'], | ||
['The stream status is CREATING.'], | ||
['Waiting for the stream to be active…'], | ||
['The stream is now active.'] | ||
]); | ||
}); | ||
|
||
test('activate waits for a stream in deleting state before trying to create it', async () => { | ||
const mockStream = { StreamARN: 'bar', StreamName: 'foo', StreamStatus: 'DELETING' }; | ||
mockStreams().push(mockStream); | ||
await expect(stream.activate(ctx)).resolves.toMatch('bar'); | ||
expect(client.createStream).toHaveBeenCalledWith({ ShardCount: 1, StreamName: 'foo' }); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" exists…'], | ||
['The stream status is DELETING.'], | ||
['Waiting for the stream to complete deletion…'], | ||
['The stream is now gone.'], | ||
['Trying to create the stream…'], | ||
['Waiting for the new stream to be active…'], | ||
['The new stream is now active.'] | ||
]); | ||
}); | ||
|
||
test('encrypt will start the stream encryption if not previously encrypted', async () => { | ||
ctx.encryption = { type: 'baz', keyId: 'qux' }; | ||
const mockStream = { StreamName: 'foo', StreamStatus: 'ACTIVE', EncryptionType: 'NONE' }; | ||
mockStreams().push(mockStream); | ||
await expect(stream.encrypt(ctx)).resolves.not.toBeDefined(); | ||
expect(client.startStreamEncryption).toHaveBeenCalledWith({ | ||
EncryptionType: 'baz', | ||
KeyId: 'qux', | ||
StreamName: 'foo' | ||
}); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" is encrypted…'], | ||
['Trying to encrypt the stream…'], | ||
['Waiting for the stream to update…'], | ||
['The stream is now encrypted.'] | ||
]); | ||
}); | ||
|
||
test("encrypt won't try to encrypt the stream if it's already encrypted", async () => { | ||
ctx.encryption = { type: 'baz', keyId: 'qux' }; | ||
const mockStream = { StreamName: 'foo', StreamStatus: 'ACTIVE', EncryptionType: 'KMS' }; | ||
mockStreams().push(mockStream); | ||
await expect(stream.encrypt(ctx)).resolves.not.toBeDefined(); | ||
expect(client.startStreamEncryption).not.toHaveBeenCalled(); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" is encrypted…'], | ||
['The stream is encrypted.'] | ||
]); | ||
}); | ||
|
||
test("tag updates the stream if it's not already tagged", async () => { | ||
ctx.tags = { baz: 'qux', quux: 'quuz', corge: 'grault' }; | ||
const mockStream = { | ||
StreamName: 'foo', | ||
StreamStatus: 'ACTIVE' | ||
}; | ||
mockStreams().push(mockStream); | ||
await expect(stream.tag(ctx)).resolves.not.toBeDefined(); | ||
expect(client.addTagsToStream).toHaveBeenCalledWith({ | ||
StreamName: 'foo', | ||
Tags: { baz: 'qux', corge: 'grault', quux: 'quuz' } | ||
checkIfStreamExists: expect.any(Function), | ||
confirmStreamTags: expect.any(Function), | ||
ensureStreamEncription: expect.any(Function), | ||
ensureStreamExists: expect.any(Function), | ||
getStreamShards: expect.any(Function) | ||
}); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" is already tagged…'], | ||
['The stream tags have been updated.'] | ||
]); | ||
}); | ||
|
||
test('tag updates the stream with the previous and new tags', async () => { | ||
ctx.tags = { corge: 'grault' }; | ||
const mockStream = { | ||
StreamName: 'foo', | ||
StreamStatus: 'ACTIVE', | ||
Tags: [{ Key: 'baz', Value: 'qux' }] | ||
}; | ||
mockStreams().push(mockStream); | ||
await stream.tag(ctx); | ||
expect(client.addTagsToStream).toHaveBeenCalledWith({ | ||
StreamName: 'foo', | ||
Tags: { baz: 'qux', corge: 'grault' } | ||
}); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" is already tagged…'], | ||
['The stream tags have been updated.'] | ||
]); | ||
}); | ||
|
||
test("tag won't update the stream if the previous and new tags are the same", async () => { | ||
ctx.tags = { baz: 'qux', quux: 'quuz', corge: 'grault' }; | ||
const mockStream = { | ||
StreamName: 'foo', | ||
StreamStatus: 'ACTIVE', | ||
Tags: [ | ||
{ Key: 'baz', Value: 'qux' }, | ||
{ Key: 'quux', Value: 'quuz' }, | ||
{ Key: 'corge', Value: 'grault' } | ||
] | ||
}; | ||
mockStreams().push(mockStream); | ||
await stream.tag(ctx); | ||
expect(client.addTagsToStream).not.toHaveBeenCalled(); | ||
expect(logger.debug.mock.calls).toEqual([ | ||
['Checking if the stream "foo" is already tagged…'], | ||
['The stream is already tagged as required.'] | ||
]); | ||
}); | ||
}); |
Oops, something went wrong.