Skip to content

Commit

Permalink
Retry network errors on registry requests (elastic#74507)
Browse files Browse the repository at this point in the history
  • Loading branch information
John Schulz authored and thomasneirynck committed Aug 21, 2020
1 parent 20ba148 commit 989c87b
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { fetchUrl } from './requests';
import { RegistryError } from '../../../errors';
jest.mock('node-fetch');

const { Response, FetchError } = jest.requireActual('node-fetch');
// eslint-disable-next-line @typescript-eslint/no-var-requires
const fetchMock = require('node-fetch') as jest.Mock;

jest.setTimeout(120 * 1000);
describe('setupIngestManager', () => {
beforeEach(async () => {});

afterEach(async () => {
jest.clearAllMocks();
});

describe('fetchUrl / getResponse errors', () => {
it('regular Errors do not retry. Becomes RegistryError', async () => {
fetchMock.mockImplementationOnce(() => {
throw new Error('mocked');
});
const promise = fetchUrl('');
await expect(promise).rejects.toThrow(RegistryError);
expect(fetchMock).toHaveBeenCalledTimes(1);
});

it('TypeErrors do not retry. Becomes RegistryError', async () => {
fetchMock.mockImplementationOnce(() => {
// @ts-expect-error
null.f();
});
const promise = fetchUrl('');
await expect(promise).rejects.toThrow(RegistryError);
expect(fetchMock).toHaveBeenCalledTimes(1);
});

describe('only system errors retry (like ECONNRESET)', () => {
it('they eventually succeed', async () => {
const successValue = JSON.stringify({ name: 'attempt 4 works', version: '1.2.3' });
fetchMock
.mockImplementationOnce(() => {
throw new FetchError('message 1', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 2', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 3', 'system', { code: 'ESOMETHING' });
})
// this one succeeds
.mockImplementationOnce(() => Promise.resolve(new Response(successValue)))
.mockImplementationOnce(() => {
throw new FetchError('message 5', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 6', 'system', { code: 'ESOMETHING' });
});

const promise = fetchUrl('');
await expect(promise).resolves.toEqual(successValue);
// doesn't retry after success
expect(fetchMock).toHaveBeenCalledTimes(4);
const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type);
expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'return']);
});

it('or error after 1 failure & 5 retries with RegistryError', async () => {
fetchMock
.mockImplementationOnce(() => {
throw new FetchError('message 1', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 2', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 3', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 4', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 5', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 6', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 7', 'system', { code: 'ESOMETHING' });
})
.mockImplementationOnce(() => {
throw new FetchError('message 8', 'system', { code: 'ESOMETHING' });
});

const promise = fetchUrl('');
await expect(promise).rejects.toThrow(RegistryError);
// doesn't retry after 1 failure & 5 failed retries
expect(fetchMock).toHaveBeenCalledTimes(6);
const actualResultsOrder = fetchMock.mock.results.map(({ type }: { type: string }) => type);
expect(actualResultsOrder).toEqual(['throw', 'throw', 'throw', 'throw', 'throw', 'throw']);
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,49 @@
* you may not use this file except in compliance with the Elastic License.
*/

import fetch, { Response } from 'node-fetch';
import fetch, { FetchError, Response } from 'node-fetch';
import pRetry from 'p-retry';
import { streamToString } from './streams';
import { RegistryError } from '../../../errors';

type FailedAttemptErrors = pRetry.FailedAttemptError | FetchError | Error;

// not sure what to call this function, but we're not exporting it
async function registryFetch(url: string) {
const response = await fetch(url);

if (response.ok) {
return response;
} else {
// 4xx & 5xx responses
// exit without retry & throw RegistryError
throw new pRetry.AbortError(
new RegistryError(`Error connecting to package registry at ${url}: ${response.statusText}`)
);
}
}

export async function getResponse(url: string): Promise<Response> {
try {
const response = await fetch(url);
if (response.ok) {
return response;
} else {
throw new RegistryError(
`Error connecting to package registry at ${url}: ${response.statusText}`
);
}
// we only want to retry certain failures like network issues
// the rest should only try the one time then fail as they do now
const response = await pRetry(() => registryFetch(url), {
factor: 2,
retries: 5,
onFailedAttempt: (error) => {
// we only want to retry certain types of errors, like `ECONNREFUSED` and other operational errors
// and let the others through without retrying
//
// throwing in onFailedAttempt will abandon all retries & fail the request
// we only want to retry system errors, so throw a RegistryError for everything else
if (!isSystemError(error)) {
throw new RegistryError(
`Error connecting to package registry at ${url}: ${error.message}`
);
}
},
});
return response;
} catch (e) {
throw new RegistryError(`Error connecting to package registry at ${url}: ${e.message}`);
}
Expand All @@ -31,3 +60,14 @@ export async function getResponseStream(url: string): Promise<NodeJS.ReadableStr
export async function fetchUrl(url: string): Promise<string> {
return getResponseStream(url).then(streamToString);
}

// node-fetch throws a FetchError for those types of errors and
// "All errors originating from Node.js core are marked with error.type = 'system'"
// https://github.com/node-fetch/node-fetch/blob/master/docs/ERROR-HANDLING.md#error-handling-with-node-fetch
function isFetchError(error: FailedAttemptErrors): error is FetchError {
return error instanceof FetchError || error.name === 'FetchError';
}

function isSystemError(error: FailedAttemptErrors): boolean {
return isFetchError(error) && error.type === 'system';
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export function bufferToStream(buffer: Buffer): PassThrough {
return stream;
}

export function streamToString(stream: NodeJS.ReadableStream): Promise<string> {
export function streamToString(stream: NodeJS.ReadableStream | Buffer): Promise<string> {
if (stream instanceof Buffer) return Promise.resolve(stream.toString());
return new Promise((resolve, reject) => {
const body: string[] = [];
stream.on('data', (chunk: string) => body.push(chunk));
Expand Down

0 comments on commit 989c87b

Please sign in to comment.