Skip to content

Commit

Permalink
Recover 100% test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
eaviles committed Aug 21, 2019
1 parent 644e9a6 commit 774af91
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 178 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/publish-module.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ jobs:
with:
version: 10
- name: Install
run: DISABLE_OPENCOLLECTIVE=true npm ci --loglevel=error
run: npm ci --loglevel=error
env:
DISABLE_OPENCOLLECTIVE: true
- name: Run linter
run: npm run eslint
- name: Run tests
Expand All @@ -35,7 +37,9 @@ jobs:
version: 10
registry-url: 'https://registry.npmjs.org'
- name: Install
run: DISABLE_OPENCOLLECTIVE=true npm ci --loglevel=error
run: npm ci --loglevel=error
env:
DISABLE_OPENCOLLECTIVE: true
- name: Publish
run: npm publish
env:
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/pull-request-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ jobs:
with:
version: ${{ matrix.node_version }}
- name: Install
run: DISABLE_OPENCOLLECTIVE=true npm ci --loglevel=error
run: npm ci --loglevel=error
env:
DISABLE_OPENCOLLECTIVE: true
- name: Run linter
run: npm run eslint
- name: Run tests
Expand Down
4 changes: 1 addition & 3 deletions lib/consumers-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ class ConsumersManager {
await consumer.start();
} catch (err) {
logger.error('Unexpected recoverable error when trying to start a consumer:', err);
if (consumers[shardId]) {
consumers[shardId].stop();
}
consumers[shardId].stop();
consumers[shardId] = undefined;
throw err;
}
Expand Down
100 changes: 70 additions & 30 deletions lib/consumers-manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('lib/consumers-manager', () => {
let manager;
let ownedShards;

const logger = { debug: jest.fn() };
const logger = { debug: jest.fn(), error: jest.fn() };
const stateStore = {
getAssignedEnhancedConsumer: jest.fn(() => assignedEnhancedConsumer),
getOwnedShards: jest.fn(() => ownedShards)
Expand All @@ -54,6 +54,7 @@ describe('lib/consumers-manager', () => {
FanOutConsumer.clearMocks();
PollingConsumer.clearMocks();
logger.debug.mockClear();
logger.error.mockClear();
stateStore.getAssignedEnhancedConsumer.mockClear();
stateStore.getOwnedShards.mockClear();
});
Expand All @@ -70,10 +71,10 @@ describe('lib/consumers-manager', () => {
expect(PollingConsumer).toHaveBeenCalledWith(expect.objectContaining({ shardId: 'foo' }));
expect(PollingConsumer.getMocks().start).toHaveBeenCalled();

expect(logger.debug.mock.calls).toEqual([
['Reconciling shard consumers…'],
['Starting a consumer for "foo"…']
]);
const { debug } = logger;
expect(debug).toHaveBeenNthCalledWith(1, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(2, 'Starting a consumer for "foo"…');
expect(debug).toHaveBeenCalledTimes(2);
});

test('the lease expiration is updated if a polling consumer is already running', async () => {
Expand All @@ -92,14 +93,14 @@ describe('lib/consumers-manager', () => {
expect(PollingConsumer.getMocks().start).not.toHaveBeenCalled();
expect(PollingConsumer.getMocks().updateLeaseExpiration).toHaveBeenCalledWith(2);

expect(logger.debug.mock.calls).toEqual([
['Reconciling shard consumers…'],
['Starting a consumer for "foo"…'],
['Reconciling shard consumers…']
]);
const { debug } = logger;
expect(debug).toHaveBeenNthCalledWith(1, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(2, 'Starting a consumer for "foo"…');
expect(debug).toHaveBeenNthCalledWith(3, 'Reconciling shard consumers…');
expect(debug).toHaveBeenCalledTimes(3);
});

test('the manager should stop consumers for no longer owned shards', async () => {
test('the manager stops consumers for no longer owned shards', async () => {
manager = new ConsumersManager({ logger, stateStore });
await manager.reconcile();
PollingConsumer.clearMocks();
Expand All @@ -110,13 +111,52 @@ describe('lib/consumers-manager', () => {
expect(PollingConsumer).not.toHaveBeenCalled();
expect(PollingConsumer.getMocks().stop).toHaveBeenCalled();

expect(logger.debug.mock.calls).toEqual([
['Reconciling shard consumers…'],
['Starting a consumer for "foo"…'],
['Reconciling shard consumers…'],
['Stopping the consumer for "foo"…'],
['Reconciling shard consumers…']
]);
const { debug } = logger;
expect(debug).toHaveBeenNthCalledWith(1, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(2, 'Starting a consumer for "foo"…');
expect(debug).toHaveBeenNthCalledWith(3, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(4, 'Stopping the consumer for "foo"…');
expect(debug).toHaveBeenNthCalledWith(5, 'Reconciling shard consumers…');
expect(debug).toHaveBeenCalledTimes(5);
});

test('the manager recovers from errors when stopping consumers', async () => {
manager = new ConsumersManager({ logger, stateStore });
await manager.reconcile();
PollingConsumer.clearMocks();
ownedShards = {};

PollingConsumer.getMocks().stop.mockImplementationOnce(() => {
throw new Error('foo');
});

await expect(manager.reconcile()).resolves.toBeUndefined();

const { error } = logger;
expect(error).toHaveBeenNthCalledWith(
1,
'Unexpected recoverable failure when trying to stop a consumer:',
expect.objectContaining({ message: 'foo' })
);
expect(error).toHaveBeenCalledTimes(1);
});

test('reconcile throws when starting consumers throws', async () => {
manager = new ConsumersManager({ logger, stateStore });

PollingConsumer.getMocks().start.mockImplementationOnce(() => {
throw new Error('foo');
});

await expect(manager.reconcile()).rejects.toEqual(expect.objectContaining({ message: 'foo' }));

const { error } = logger;
expect(error).toHaveBeenNthCalledWith(
1,
'Unexpected recoverable error when trying to start a consumer:',
expect.objectContaining({ message: 'foo' })
);
expect(error).toHaveBeenCalledTimes(1);
});

test('the manager is able to instantiate and start fan-out consumers as needed', async () => {
Expand All @@ -126,13 +166,13 @@ describe('lib/consumers-manager', () => {
expect(FanOutConsumer).toHaveBeenCalledWith(expect.objectContaining({ shardId: 'foo' }));
expect(FanOutConsumer.getMocks().start).toHaveBeenCalled();

expect(logger.debug.mock.calls).toEqual([
['Reconciling shard consumers…'],
['Starting a consumer for "foo"…']
]);
const { debug } = logger;
expect(debug).toHaveBeenNthCalledWith(1, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(2, 'Starting a consumer for "foo"…');
expect(debug).toHaveBeenCalledTimes(2);
});

test('all fan-out consumers should stop if the assigned enhanced consumer is lost', async () => {
test('all fan-out consumers stop if the assigned enhanced consumer is lost', async () => {
manager = new ConsumersManager({ logger, stateStore, useEnhancedFanOut: true });
await manager.reconcile();
FanOutConsumer.clearMocks();
Expand All @@ -142,15 +182,15 @@ describe('lib/consumers-manager', () => {
expect(FanOutConsumer).not.toHaveBeenCalled();
expect(FanOutConsumer.getMocks().stop).toHaveBeenCalled();

expect(logger.debug.mock.calls).toEqual([
['Reconciling shard consumers…'],
['Starting a consumer for "foo"…'],
['Reconciling shard consumers…'],
['Stopping the consumer for "foo"…']
]);
const { debug } = logger;
expect(debug).toHaveBeenNthCalledWith(1, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(2, 'Starting a consumer for "foo"…');
expect(debug).toHaveBeenNthCalledWith(3, 'Reconciling shard consumers…');
expect(debug).toHaveBeenNthCalledWith(4, 'Stopping the consumer for "foo"…');
expect(debug).toHaveBeenCalledTimes(4);
});

test('the manager should stop all consumers when asked to stop', async () => {
test('the manager stops all consumers when asked to stop', async () => {
manager = new ConsumersManager({ logger, stateStore });
await manager.reconcile();
manager.stop();
Expand Down
2 changes: 2 additions & 0 deletions lib/fan-out-consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,9 @@ describe('lib/fan-out-consumer', () => {
test('a consumer can be stopped twice', async () => {
const consumer = new FanOutConsumer(options);
consumer.start();
await nextTickWait();
consumer.stop();
await nextTickWait();
consumer.stop();
expect(setTimeout).toHaveBeenCalledTimes(1);
expect(clearTimeout).toHaveBeenCalledTimes(3);
Expand Down
92 changes: 50 additions & 42 deletions lib/heartbeat-manager.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ const HeartbeatManager = require('./heartbeat-manager');

describe('lib/heartbeat-manager', () => {
const debug = jest.fn();
const logger = { debug };
const error = jest.fn();
const logger = { debug, error };
const clearOldConsumers = jest.fn();
const registerConsumer = jest.fn();
const stateStore = { clearOldConsumers, registerConsumer };

function nextTickWait() {
return new Promise(resolve => setImmediate(resolve));
}

beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
debug.mockClear();
error.mockClear();
clearOldConsumers.mockClear();
registerConsumer.mockClear();
});
Expand All @@ -24,75 +30,77 @@ describe('lib/heartbeat-manager', () => {
expect(HeartbeatManager).toThrow('Class constructor');
});

test('the heartbeat manager does the expected after starting it', async done => {
test('the heartbeat manager does the expected after starting it', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
await manager.start();
jest.runOnlyPendingTimers();
setImmediate(() => {
expect(clearOldConsumers).toHaveBeenCalledWith(expect.any(Number));
expect(clearOldConsumers).toHaveBeenCalledTimes(2);
expect(registerConsumer).toHaveBeenCalled();
expect(registerConsumer).toHaveBeenCalledTimes(2);
expect(debug).toHaveBeenCalled();
expect(debug).toHaveBeenCalledTimes(2);
manager.stop();
done();
});
await nextTickWait();
expect(clearOldConsumers).toHaveBeenCalledWith(expect.any(Number));
expect(clearOldConsumers).toHaveBeenCalledTimes(2);
expect(registerConsumer).toHaveBeenCalled();
expect(registerConsumer).toHaveBeenCalledTimes(2);
expect(debug).toHaveBeenCalled();
expect(debug).toHaveBeenCalledTimes(2);
manager.stop();
});

test('the heartbeat manager can be started multiple times without problems', async done => {
test('the heartbeat manager can be started multiple times without problems', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
await manager.start();
await manager.start();
jest.runOnlyPendingTimers();
setImmediate(() => {
expect(clearOldConsumers).toHaveBeenCalledTimes(2);
expect(registerConsumer).toHaveBeenCalledTimes(2);
expect(debug).toHaveBeenCalledTimes(2);
manager.stop();
done();
});
await nextTickWait();
expect(clearOldConsumers).toHaveBeenCalledTimes(2);
expect(registerConsumer).toHaveBeenCalledTimes(2);
expect(debug).toHaveBeenCalledTimes(2);
manager.stop();
});

test('the heartbeat manager can be stopped', async done => {
test('the heartbeat manager can be stopped', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
await manager.start();
manager.stop();
jest.runOnlyPendingTimers();
setImmediate(() => {
expect(clearOldConsumers).toHaveBeenCalledTimes(1);
expect(registerConsumer).toHaveBeenCalledTimes(1);
expect(debug).toHaveBeenCalledTimes(1);
done();
});
await nextTickWait();
expect(clearOldConsumers).toHaveBeenCalledTimes(1);
expect(registerConsumer).toHaveBeenCalledTimes(1);
expect(debug).toHaveBeenCalledTimes(1);
});

test('the heartbeat manager can be stopped multiple times without problems', async done => {
test('the heartbeat manager can be stopped multiple times without problems', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
await manager.start();
manager.stop();
manager.stop();
jest.runOnlyPendingTimers();
setImmediate(() => {
expect(clearOldConsumers).toHaveBeenCalledTimes(1);
expect(registerConsumer).toHaveBeenCalledTimes(1);
expect(debug).toHaveBeenCalledTimes(1);
done();
});
await nextTickWait();
expect(clearOldConsumers).toHaveBeenCalledTimes(1);
expect(registerConsumer).toHaveBeenCalledTimes(1);
expect(debug).toHaveBeenCalledTimes(1);
});

test('the heartbeat manager can resume after being stopped', async done => {
test('the heartbeat manager can resume after being stopped', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
await manager.start();
manager.stop();
await manager.start();
jest.runOnlyPendingTimers();
setImmediate(() => {
expect(clearOldConsumers).toHaveBeenCalledTimes(3);
expect(registerConsumer).toHaveBeenCalledTimes(3);
expect(debug).toHaveBeenCalledTimes(3);
manager.stop();
done();
});
await nextTickWait();
expect(clearOldConsumers).toHaveBeenCalledTimes(3);
expect(registerConsumer).toHaveBeenCalledTimes(3);
expect(debug).toHaveBeenCalledTimes(3);
manager.stop();
});

test('the heartbeat manager can recover from thrown errors', async () => {
const manager = new HeartbeatManager({ logger, stateStore });
registerConsumer.mockRejectedValueOnce(new Error('foo'));
await expect(manager.start()).resolves.toBeUndefined();
expect(error).toHaveBeenNthCalledWith(
1,
'Unexpected recoverable failure when trying to send a hearbeat:',
expect.objectContaining({ message: 'foo' })
);
expect(error).toHaveBeenCalledTimes(1);
});
});
Loading

0 comments on commit 774af91

Please sign in to comment.