Skip to content

Commit

Permalink
fix(ext/node): send data frame with end_stream flag on _final call (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
satyarohith committed Jun 10, 2024
1 parent 3be0a1e commit 4fd3d5a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
4 changes: 2 additions & 2 deletions ext/node/ops/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ pub async fn op_http2_client_send_data(
state: Rc<RefCell<OpState>>,
#[smi] stream_rid: ResourceId,
#[buffer] data: JsBuffer,
end_of_stream: bool,
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<Http2ClientStream>(stream_rid)?;
let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;

// TODO(bartlomieju): handle end of stream
stream.send_data(data.to_vec().into(), false)?;
stream.send_data(data.to_vec().into(), end_of_stream)?;
Ok(())
}

Expand Down
23 changes: 19 additions & 4 deletions ext/node/polyfills/http2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ export class ClientHttp2Stream extends Duplex {
return;
}

shutdownWritable(this, cb);
shutdownWritable(this, cb, this.#rid);
}

// TODO(bartlomieju): needs a proper cleanup
Expand Down Expand Up @@ -1176,15 +1176,30 @@ export class ClientHttp2Stream extends Duplex {
}
}

function shutdownWritable(stream, callback) {
function shutdownWritable(stream, callback, streamRid) {
debugHttp2(">>> shutdownWritable", callback);
const state = stream[kState];
if (state.shutdownWritableCalled) {
debugHttp2(">>> shutdownWritable() already called");
return callback();
}
state.shutdownWritableCalled = true;
onStreamTrailers(stream);
callback();
if (state.flags & STREAM_FLAGS_HAS_TRAILERS) {
onStreamTrailers(stream);
callback();
} else {
op_http2_client_send_data(streamRid, new Uint8Array(), true)
.then(() => {
callback();
stream[kMaybeDestroy]();
core.tryClose(streamRid);
})
.catch((e) => {
callback(e);
core.tryClose(streamRid);
stream._destroy(e);
});
}
// TODO(bartlomieju): might have to add "finish" event listener here,
// check it.
}
Expand Down

0 comments on commit 4fd3d5a

Please sign in to comment.