Skip to content

Commit

Permalink
poll_ready pending after WouldBlock errors
Browse files Browse the repository at this point in the history
  • Loading branch information
alexheretic authored and sdroege committed Aug 8, 2023
1 parent 4ada42d commit dcefd9b
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ pub struct WebSocketStream<S> {
inner: WebSocket<AllowStd<S>>,
closing: bool,
ended: bool,
/// Tungstenite is probably ready to receive more data.
///
/// `false` once start_send hits `WouldBlock` errors.
/// `true` initially and after `flush`ing.
ready: bool,
}

impl<S> WebSocketStream<S> {
Expand Down Expand Up @@ -262,10 +267,11 @@ impl<S> WebSocketStream<S> {
}

pub(crate) fn new(ws: WebSocket<AllowStd<S>>) -> Self {
WebSocketStream {
Self {
inner: ws,
closing: false,
ended: false,
ready: true,
}
}

Expand Down Expand Up @@ -369,25 +375,35 @@ where
type Error = WsError;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
if self.ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}

fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match (*self).with_context(None, |s| s.write(item)) {
Ok(()) => Ok(()),
Ok(()) => {
self.ready = true;
Ok(())
}
Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// the message was accepted and queued
// isn't an error.
// the message was accepted and queued so not an error
// but `poll_ready` will start returning pending now.
self.ready = false;
Ok(())
}
Err(e) => {
self.ready = true;
debug!("websocket start_send error: {}", e);
Err(e)
}
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ready = true;
(*self)
.with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush()))
.map(|r| {
Expand All @@ -400,6 +416,7 @@ where
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ready = true;
let res = if self.closing {
// After queueing it, we call `flush` to drive the close handshake to completion.
(*self).with_context(Some((ContextWaker::Write, cx)), |s| s.flush())
Expand Down

0 comments on commit dcefd9b

Please sign in to comment.