Skip to content

Commit

Permalink
Fix Stream.pipeline(...) usage and properly detect aborted requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
szmarczak authored and sindresorhus committed Nov 1, 2019
1 parent 7dc42e6 commit abd95a3
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 31 deletions.
39 changes: 26 additions & 13 deletions source/request-as-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,17 @@ export default (options: NormalizedOptions, input?: TransformStream) => {

currentRequest = request;

request.on('error', error => {
if (typeof request.aborted === 'number' || error.message === 'socket hang up') {
const onError = (error: Error): void => {
const isTimedOutError = error instanceof TimedOutTimeoutError;

// `request.aborted` is a boolean since v11.0.0: https://github.com/nodejs/node/commit/4b00c4fafaa2ae8c41c1f78823c0feb810ae4723#diff-e3bc37430eb078ccbafe3aa3b570c91a
// We need to allow `TimedOutTimeoutError` here, because it `stream.pipeline(..)` aborts it automatically.
if (!isTimedOutError && (typeof request.aborted === 'number' || (request.aborted as unknown as boolean) === true)) {
return;
}

if (error instanceof TimedOutTimeoutError) {
if (isTimedOutError) {
// @ts-ignore TS is dumb.
error = new TimeoutError(error, timings, options);
} else {
error = new RequestError(error, options);
Expand All @@ -208,7 +213,22 @@ export default (options: NormalizedOptions, input?: TransformStream) => {
if (emitter.retry(error) === false) {
emitError(error);
}
});
};

const uploadComplete = (error?: Error): void => {
if (error) {
onError(error);
return;
}

// No need to attach an error handler here,
// as `stream.pipeline(...)` doesn't remove this handler
// to allow stream reuse.

request.emit('upload-complete');
};

request.on('error', onError);

timings = timer(request);

Expand All @@ -220,20 +240,13 @@ export default (options: NormalizedOptions, input?: TransformStream) => {

emitter.emit('request', request);

const uploadComplete = (error?: Error): void => {
if (error) {
emitError(new RequestError(error, options));
return;
}

request.emit('upload-complete');
};

try {
if (is.nodeStream(options.body)) {
const {body} = options;
delete options.body;

// `stream.pipeline(...)` does it for us.
request.removeListener('error', onError);
stream.pipeline(
body,
request,
Expand Down
5 changes: 1 addition & 4 deletions source/utils/timed-out.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ export default (request: ClientRequest, delays: Delays, options: TimedOutOptions
const timeout: NodeJS.Timeout = setTimeout(() => {
// @ts-ignore https://github.com/microsoft/TypeScript/issues/26113
immediate = setImmediate(callback, delay, ...args);
/* istanbul ignore next: added in node v9.7.0 */
if (immediate.unref) {
immediate.unref();
}
immediate.unref();
}, delay);

/* istanbul ignore next: in order to support electron renderer */
Expand Down
8 changes: 2 additions & 6 deletions test/gzip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,13 @@ test('handles gzip error', withServer, async (t, server, got) => {
t.is(error.name, 'ReadError');
});

// FIXME: This causes an unhandled rejection.
// eslint-disable-next-line ava/no-skip-test
test.skip('handles gzip error - stream', withServer, async (t, server, got) => {
test('handles gzip error - stream', withServer, async (t, server, got) => {
server.get('/', (_request, response) => {
response.setHeader('Content-Encoding', 'gzip');
response.end('Not gzipped content');
});

const error = await t.throws(() => {
got.stream('');
}, 'incorrect header check');
const error = await t.throwsAsync(getStream(got.stream('')), 'incorrect header check');

// @ts-ignore
t.is(error.options.path, '/');
Expand Down
10 changes: 10 additions & 0 deletions test/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,13 @@ test('response contains got options', withServer, async (t, server, got) => {

t.is((await got(options)).request.options.auth, options.auth);
});

test('socket destroyed by the server throws ECONNRESET', withServer, async (t, server, got) => {
server.get('/', request => {
request.socket.destroy();
});

await t.throwsAsync(got('', {retry: 0}), {
code: 'ECONNRESET'
});
});
10 changes: 2 additions & 8 deletions test/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ test.serial('send timeout', withServerAndLolex, async (t, server, got, clock) =>
);
});

// FIXME: This causes an unhandled rejection.
// eslint-disable-next-line ava/no-skip-test
test.serial.skip('send timeout (keepalive)', withServerAndLolex, async (t, server, got, clock) => {
test.serial('send timeout (keepalive)', withServerAndLolex, async (t, server, got, clock) => {
server.post('/', defaultHandler(clock));
server.get('/prime', (_request, response) => {
response.end('ok');
Expand Down Expand Up @@ -414,13 +412,9 @@ test.serial('no unhandled `socket hung up` errors', withServerAndLolex, async (t
server.get('/', defaultHandler(clock));

await t.throwsAsync(
got({retry: 0, timeout: requestDelay / 2}).on('request', () => {
clock.tick(requestDelay);
}),
got({retry: 0, timeout: requestDelay / 2}),
{instanceOf: got.TimeoutError}
);

clock.tick(requestDelay);
});

test.serial('no more timeouts after an error', withServerAndLolex, async (t, _server, got, clock) => {
Expand Down

0 comments on commit abd95a3

Please sign in to comment.