-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: pipeline wait for close before calling the callback #53462
base: main
Are you sure you want to change the base?
Conversation
Review requested:
|
13d24b0
to
de856a4
Compare
This is different from what I was picturing, and I wonder if it works with a pipeline that has some number of The problem as I see it is that over time, various new places have been added that call diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index bb34759..83c53d8 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(err, --finishCount === 0);
}
+ function finishOnlyHandleError(err) {
+ finishImpl(err, false);
+ }
+
function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
@@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
- finish(err);
+ finishOnlyHandleError(err);
}
}
stream.on('error', onError);
@@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
- const cleanup = pipe(ret, stream, finish, { end });
+ const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
@@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}
-function pipe(src, dst, finish, { end }) {
+function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
let ended = false;
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
- finish(new ERR_STREAM_PREMATURE_CLOSE());
+ finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
}
}); (not tested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@wh0 Thanks for spending time and looking into it!
That was a great catch! It WON'T work when I add one or more
I see your point and I agree, overtime it would become harder to work with. And I have also tested it your patch and it works just like expected. I will temporarily mark this PR as draft and take some more time to look into your patch and add additional tests to cover the case where there one or more @mcollina Thanks for looking into it, will bug you for review again once its ready 😄 🙏 |
The pipeline should wait for close event to finish before calling the callback. The `finishCount` should not below 0 when calling finish function. Fixes: nodejs#51540 Co-authored-by: wh0 <wh0@users.noreply.github.com>
de856a4
to
0154e63
Compare
I have added another new test to include a number of @wh0 your fix was clean and way better than my original "monkey patch" fix (there was no issue with your patch, my previous message saying there was a little bit issue was just me waking up without coffee 😄) absolutely learned a lot. I have added you as a co-author please let me know if you are happy with me using your patch. |
I would like to properly review this. Please don't land yet. |
thanks jakecastelli, I'm happy to have this patch used/adapted. please accept it under the same license as the rest of the nodejs project. glad to know this approach works even with the PassThroughs. ronag thanks for your time as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this will break legacy user land streams that don't always emit close. Pipeline does not guarantee that close is emitted on all streams before the callback is invoked. The current behavior is intentional. If the docs says otherwise I believe we should fix those instead.
This was mentioned in the comments for the issue: #51540
I appreciate the effort and sorry for the late feedback.
Thanks @ronag! This makes sense, when callback is not called in the Even though the current behaviour feels slightly weird as if there is any async operation in the Thanks again for everyone's time on this, I will close this PR soon 🙏 |
what kind of break is this referring to? e.g. never calling the callback? |
the patch doesn't depend on |
I will take another look. |
@ronag I think that case is handled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Excellent work.
pipeline doc: here
The pipeline should wait for close event to finish before calling the callback as that should be the proper "fully done" state.
The
finishCount
should not below 0 when callingfinishImpl
function (should not be negative).Fixes: #51540