Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: pipeline wait for close before calling the callback #53462

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jakecastelli
Copy link
Contributor

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

stream.pipeline() closes all the streams when an error is raised

pipeline doc: here

The pipeline should wait for close event to finish before calling the callback as that should be the proper "fully done" state.

The finishCount should not below 0 when calling finishImpl function (should not be negative).

Fixes: #51540

@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added needs-ci PRs that need a full CI run. stream Issues and PRs related to the stream subsystem. labels Jun 15, 2024
@jakecastelli
Copy link
Contributor Author

I have spent a lot of time looking into this issue, I would really appreciate your comment or feedback 🙏
@wh0 @lpinca @ronag

@wh0
Copy link
Contributor

wh0 commented Jun 15, 2024

This is different from what I was picturing, and I wonder if it works with a pipeline that has some number of PassThroughs in between.

The problem as I see it is that over time, various new places have been added that call finish that circumstantially deliver specific error conditions and which don't fit into the finishCount logic. I see the fix being along the lines of this:

diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index bb34759..83c53d8 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) {
     finishImpl(err, --finishCount === 0);
   }
 
+  function finishOnlyHandleError(err) {
+    finishImpl(err, false);
+  }
+
   function finishImpl(err, final) {
     if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
       error = err;
@@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
           err.name !== 'AbortError' &&
           err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
         ) {
-          finish(err);
+          finishOnlyHandleError(err);
         }
       }
       stream.on('error', onError);
@@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) {
     } else if (isNodeStream(stream)) {
       if (isReadableNodeStream(ret)) {
         finishCount += 2;
-        const cleanup = pipe(ret, stream, finish, { end });
+        const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
         if (isReadable(stream) && isLastStream) {
           lastStreamCleanup.push(cleanup);
         }
@@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) {
   return ret;
 }
 
-function pipe(src, dst, finish, { end }) {
+function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
   let ended = false;
   dst.on('close', () => {
     if (!ended) {
       // Finish if the destination closes before the source has completed.
-      finish(new ERR_STREAM_PREMATURE_CLOSE());
+      finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
     }
   });

(not tested)

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@mcollina mcollina added the request-ci Add this label to start a Jenkins CI on a PR. label Jun 15, 2024
@github-actions github-actions bot removed the request-ci Add this label to start a Jenkins CI on a PR. label Jun 15, 2024
@nodejs-github-bot
Copy link
Collaborator

@jakecastelli jakecastelli marked this pull request as draft June 16, 2024 00:56
@jakecastelli
Copy link
Contributor Author

jakecastelli commented Jun 16, 2024

@wh0 Thanks for spending time and looking into it!

I wonder if it works with a pipeline that has some number of PassThroughs in between

That was a great catch! It WON'T work when I add one or more PassThrough.

The problem as I see it is that over time, various new places have been added that call finish that circumstantially deliver specific error conditions and which don't fit into the finishCount logic. I see the fix being along the lines of this

I see your point and I agree, overtime it would become harder to work with. And I have also tested it your patch and it works just like expected.

I will temporarily mark this PR as draft and take some more time to look into your patch and add additional tests to cover the case where there one or more PassTrough.

@mcollina Thanks for looking into it, will bug you for review again once its ready 😄 🙏

The pipeline should wait for close event to finish before calling
the callback.

The `finishCount` should not below 0 when calling finish function.

Fixes: nodejs#51540

Co-authored-by: wh0 <wh0@users.noreply.github.com>
@jakecastelli
Copy link
Contributor Author

I have added another new test to include a number of PassThrough in between.

@wh0 your fix was clean and way better than my original "monkey patch" fix (there was no issue with your patch, my previous message saying there was a little bit issue was just me waking up without coffee 😄) absolutely learned a lot. I have added you as a co-author please let me know if you are happy with me using your patch.

@jakecastelli jakecastelli marked this pull request as ready for review June 16, 2024 07:54
@ronag
Copy link
Member

ronag commented Jun 16, 2024

I would like to properly review this. Please don't land yet.

@wh0
Copy link
Contributor

wh0 commented Jun 16, 2024

thanks jakecastelli, I'm happy to have this patch used/adapted. please accept it under the same license as the rest of the nodejs project. glad to know this approach works even with the PassThroughs. ronag thanks for your time as well

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@mcollina
Copy link
Member

@mcollina mcollina added the request-ci Add this label to start a Jenkins CI on a PR. label Jun 17, 2024
@github-actions github-actions bot removed the request-ci Add this label to start a Jenkins CI on a PR. label Jun 17, 2024
@nodejs-github-bot
Copy link
Collaborator

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will break legacy user land streams that don't always emit close. Pipeline does not guarantee that close is emitted on all streams before the callback is invoked. The current behavior is intentional. If the docs says otherwise I believe we should fix those instead.

This was mentioned in the comments for the issue: #51540

I appreciate the effort and sorry for the late feedback.

@jakecastelli
Copy link
Contributor Author

jakecastelli commented Jun 18, 2024

Thanks @ronag! This makes sense, when callback is not called in the destroy, then the callback would not be called in the pipeline either.

Even though the current behaviour feels slightly weird as if there is any async operation in the destroy such as cleanup, the user cannot rely on the callback that cleanup is guaranteed done, I believe this patch would introduce more potential issues than its fixing.

Thanks again for everyone's time on this, I will close this PR soon 🙏

@wh0
Copy link
Contributor

wh0 commented Jun 18, 2024

I believe this will break legacy user land streams that don't always emit close.

what kind of break is this referring to? e.g. never calling the callback?

@wh0
Copy link
Contributor

wh0 commented Jun 18, 2024

the patch doesn't depend on close being emitted. the two finalCount decrements come from eos(src) and eos(dst). end-of-stream has its own tests that ensure it works properly on emitClose: false streams

@ronag
Copy link
Member

ronag commented Jun 18, 2024

I will take another look.

@mcollina
Copy link
Member

@ronag I think that case is handled.

Copy link
Member

@ronag ronag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Excellent work.

@nodejs-github-bot
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

stream.pipeline doesn't wait for 'close' on error
5 participants